18 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/ff54ae664024/ Changeset: ff54ae664024 User: jmchilton Date: 2014-12-16 03:20:11+00:00 Summary: Code de-dup in workflow controller. Affected #: 1 file diff -r 4f767bb3989f2ee9b7e3fdb47fa4cdf716205fc6 -r ff54ae664024274bb876d54930f6c3e8786f629d lib/galaxy/webapps/galaxy/controllers/workflow.py --- a/lib/galaxy/webapps/galaxy/controllers/workflow.py +++ b/lib/galaxy/webapps/galaxy/controllers/workflow.py @@ -666,12 +666,8 @@ encode it as a json string that can be read by the workflow editor web interface. """ - user = trans.get_user() - id = trans.security.decode_id( id ) trans.workflow_building_mode = True - # Load encoded workflow from database - stored = trans.sa_session.query( model.StoredWorkflow ).get( id ) - assert stored.user == user + stored = self.get_stored_workflow( trans, id, check_ownership=True, check_accessible=False ) workflow = stored.latest_workflow # Pack workflow data into a dictionary and return data = {} @@ -898,12 +894,8 @@ """ Exports a workflow to myExperiment website. """ - - # Load encoded workflow from database - id = trans.security.decode_id( id ) trans.workflow_building_mode = True - stored = trans.sa_session.query( model.StoredWorkflow ).get( id ) - self.security_check( trans, stored, False, True ) + stored = self.get_stored_workflow( trans, id, check_ownership=False, check_accessible=True ) # Convert workflow to dict. workflow_dict = self._workflow_to_dict( trans, stored ) https://bitbucket.org/galaxy/galaxy-central/commits/e89cb5a78e20/ Changeset: e89cb5a78e20 User: jmchilton Date: 2014-12-16 03:20:11+00:00 Summary: Refactor helper out for detailed encoding of a workflow for API. Affected #: 2 files diff -r ff54ae664024274bb876d54930f6c3e8786f629d -r e89cb5a78e209b8a9c1209603a357b10194e377e lib/galaxy/webapps/galaxy/api/workflows.py --- a/lib/galaxy/webapps/galaxy/api/workflows.py +++ b/lib/galaxy/webapps/galaxy/api/workflows.py @@ -75,44 +75,8 @@ if trans.sa_session.query(trans.app.model.StoredWorkflowUserShareAssociation).filter_by(user=trans.user, stored_workflow=stored_workflow).count() == 0: message = "Workflow is neither importable, nor owned by or shared with current user" raise exceptions.ItemAccessibilityException( message ) - - item = stored_workflow.to_dict( view='element', value_mapper={ 'id': trans.security.encode_id } ) - item['url'] = url_for('workflow', id=id) - item['owner'] = stored_workflow.user.username latest_workflow = stored_workflow.latest_workflow - inputs = {} - for step in latest_workflow.steps: - step_type = step.type - if step_type in ['data_input', 'data_collection_input']: - if step.tool_inputs and "name" in step.tool_inputs: - label = step.tool_inputs['name'] - elif step_type == "data_input": - label = "Input Dataset" - elif step_type == "data_collection_input": - label = "Input Dataset Collection" - else: - raise ValueError("Invalid step_type %s" % step_type) - inputs[step.id] = {'label': label, 'value': ""} - else: - pass - # Eventually, allow regular tool parameters to be inserted and modified at runtime. - # p = step.get_required_parameters() - item['inputs'] = inputs - item['annotation'] = self.get_item_annotation_str( trans.sa_session, stored_workflow.user, stored_workflow ) - steps = {} - for step in latest_workflow.steps: - steps[step.id] = {'id': step.id, - 'type': step.type, - 'tool_id': step.tool_id, - 'tool_version': step.tool_version, - 'annotation': self.get_item_annotation_str( trans.sa_session, stored_workflow.user, step ), - 'tool_inputs': step.tool_inputs, - 'input_steps': {}} - for conn in step.input_connections: - steps[step.id]['input_steps'][conn.input_name] = {'source_step': conn.output_step_id, - 'step_output': conn.output_name} - item['steps'] = steps - return item + return self.__encode_workflow( trans, stored_workflow, latest_workflow ) @expose_api def create(self, trans, payload, **kwd): @@ -512,6 +476,44 @@ ) return self.__encode_invocation_step( trans, invocation_step ) + def __encode_workflow( self, trans, stored_workflow, workflow ): + item = stored_workflow.to_dict( view='element', value_mapper={ 'id': trans.security.encode_id } ) + item['url'] = url_for('workflow', id=item['id']) + item['owner'] = stored_workflow.user.username + inputs = {} + for step in workflow.steps: + step_type = step.type + if step_type in ['data_input', 'data_collection_input']: + if step.tool_inputs and "name" in step.tool_inputs: + label = step.tool_inputs['name'] + elif step_type == "data_input": + label = "Input Dataset" + elif step_type == "data_collection_input": + label = "Input Dataset Collection" + else: + raise ValueError("Invalid step_type %s" % step_type) + inputs[step.id] = {'label': label, 'value': ""} + else: + pass + # Eventually, allow regular tool parameters to be inserted and modified at runtime. + # p = step.get_required_parameters() + item['inputs'] = inputs + item['annotation'] = self.get_item_annotation_str( trans.sa_session, stored_workflow.user, stored_workflow ) + steps = {} + for step in workflow.steps: + steps[step.id] = {'id': step.id, + 'type': step.type, + 'tool_id': step.tool_id, + 'tool_version': step.tool_version, + 'annotation': self.get_item_annotation_str( trans.sa_session, stored_workflow.user, step ), + 'tool_inputs': step.tool_inputs, + 'input_steps': {}} + for conn in step.input_connections: + steps[step.id]['input_steps'][conn.input_name] = {'source_step': conn.output_step_id, + 'step_output': conn.output_name} + item['steps'] = steps + return item + def __encode_invocation_step( self, trans, invocation_step ): return self.encode_all_ids( trans, diff -r ff54ae664024274bb876d54930f6c3e8786f629d -r e89cb5a78e209b8a9c1209603a357b10194e377e test/api/test_workflows.py --- a/test/api/test_workflows.py +++ b/test/api/test_workflows.py @@ -181,6 +181,13 @@ def setUp( self ): super( WorkflowsApiTestCase, self ).setUp() + def test_show_valid( self ): + workflow_id = self.workflow_populator.simple_workflow( "test_regular" ) + show_response = self._get( "workflows/%s" % workflow_id ) + workflow = show_response.json() + self._assert_looks_like_instance_workflow_representation( workflow ) + assert len(workflow["steps"]) == 3 + def test_show_invalid_key_is_400( self ): show_response = self._get( "workflows/%s" % self._random_key() ) self._assert_status_code_is( show_response, 400 ) @@ -731,4 +738,25 @@ self._assert_status_code_is( show_response, 200 ) return show_response.json() + def _assert_looks_like_instance_workflow_representation(self, workflow): + self._assert_has_keys( + workflow, + 'url', + 'owner', + 'inputs', + 'annotation', + 'steps' + ) + for step in workflow["steps"].values(): + self._assert_has_keys( + step, + 'id', + 'type', + 'tool_id', + 'tool_version', + 'annotation', + 'tool_inputs', + 'input_steps', + ) + RunJobsSummary = namedtuple('RunJobsSummary', ['history_id', 'workflow_id', 'inputs', 'jobs']) https://bitbucket.org/galaxy/galaxy-central/commits/448c66ad21e0/ Changeset: 448c66ad21e0 User: jmchilton Date: 2014-12-16 03:20:11+00:00 Summary: Move building workflows from dictionaries into new WorkflowContentsManager. Going to build up a WorkflowContentsManager that is reponsible for slightly more complex operations involving step logic, etc... and keep the WorkflowsManager focused on CRUD-like and sharing operations. Affected #: 2 files diff -r e89cb5a78e209b8a9c1209603a357b10194e377e -r 448c66ad21e012c7b18f1edfd7729527dc8cd507 lib/galaxy/managers/workflows.py --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -1,7 +1,18 @@ +from __future__ import absolute_import + +from collections import namedtuple +import json + from galaxy import model from galaxy import exceptions +from galaxy.model.item_attrs import UsesAnnotations from galaxy.workflow import modules +# For WorkflowContentManager +from galaxy.util.sanitize_html import sanitize_html +from galaxy.workflow.steps import attach_ordered_steps +from galaxy.workflow.modules import module_factory + class WorkflowsManager( object ): """ Handle CRUD type operaitons related to workflows. More interesting @@ -103,3 +114,124 @@ ).filter_by( workflow_id=stored_workflow.latest_workflow_id ) + + +CreatedWorkflow = namedtuple("CreatedWorkflow", ["stored_workflow", "missing_tools"]) + + +class WorkflowContentsManager(UsesAnnotations): + + def build_workflow_from_dict( + self, + trans, + data, + source=None, + add_to_menu=False, + publish=False + ): + # Put parameters in workflow mode + trans.workflow_building_mode = True + # Create new workflow from incoming dict + workflow = model.Workflow() + # If there's a source, put it in the workflow name. + if source: + name = "%s (imported from %s)" % ( data['name'], source ) + else: + name = data['name'] + workflow.name = name + if 'uuid' in data: + workflow.uuid = data['uuid'] + # Assume no errors until we find a step that has some + workflow.has_errors = False + # Create each step + steps = [] + # The editor will provide ids for each step that we don't need to save, + # but do need to use to make connections + steps_by_external_id = {} + # Keep track of tools required by the workflow that are not available in + # the local Galaxy instance. Each tuple in the list of missing_tool_tups + # will be ( tool_id, tool_name, tool_version ). + missing_tool_tups = [] + supplied_steps = data[ 'steps' ] + # Try to iterate through imported workflow in such a way as to + # preserve step order. + step_indices = supplied_steps.keys() + try: + step_indices = sorted( step_indices, key=int ) + except ValueError: + # to defensive, were these ever or will they ever not be integers? + pass + # First pass to build step objects and populate basic values + for step_index in step_indices: + step_dict = supplied_steps[ step_index ] + # Create the model class for the step + step = model.WorkflowStep() + steps.append( step ) + steps_by_external_id[ step_dict['id' ] ] = step + # FIXME: Position should be handled inside module + step.position = step_dict['position'] + module = module_factory.from_dict( trans, step_dict, secure=False ) + module.save_to_step( step ) + if module.type == 'tool' and module.tool is None: + # A required tool is not available in the local Galaxy instance. + missing_tool_tup = ( step_dict[ 'tool_id' ], step_dict[ 'name' ], step_dict[ 'tool_version' ] ) + if missing_tool_tup not in missing_tool_tups: + missing_tool_tups.append( missing_tool_tup ) + # Save the entire step_dict in the unused config field, be parsed later + # when we do have the tool + step.config = json.dumps(step_dict) + if step.tool_errors: + workflow.has_errors = True + # Stick this in the step temporarily + step.temp_input_connections = step_dict['input_connections'] + # Save step annotation. + annotation = step_dict[ 'annotation' ] + if annotation: + annotation = sanitize_html( annotation, 'utf-8', 'text/html' ) + self.add_item_annotation( trans.sa_session, trans.get_user(), step, annotation ) + # Second pass to deal with connections between steps + for step in steps: + # Input connections + for input_name, conn_list in step.temp_input_connections.iteritems(): + if not conn_list: + continue + if not isinstance(conn_list, list): # Older style singleton connection + conn_list = [conn_list] + for conn_dict in conn_list: + conn = model.WorkflowStepConnection() + conn.input_step = step + conn.input_name = input_name + conn.output_name = conn_dict['output_name'] + conn.output_step = steps_by_external_id[ conn_dict['id'] ] + del step.temp_input_connections + + # Order the steps if possible + attach_ordered_steps( workflow, steps ) + + # Connect up + stored = model.StoredWorkflow() + stored.name = workflow.name + workflow.stored_workflow = stored + stored.latest_workflow = workflow + stored.user = trans.user + stored.published = publish + if data[ 'annotation' ]: + annotation = sanitize_html( data[ 'annotation' ], 'utf-8', 'text/html' ) + self.add_item_annotation( trans.sa_session, stored.user, stored, annotation ) + + # Persist + trans.sa_session.add( stored ) + trans.sa_session.flush() + + if add_to_menu: + if trans.user.stored_workflow_menu_entries is None: + trans.user.stored_workflow_menu_entries = [] + menuEntry = model.StoredWorkflowMenuEntry() + menuEntry.stored_workflow = stored + trans.user.stored_workflow_menu_entries.append( menuEntry ) + trans.sa_session.flush() + + return CreatedWorkflow( + stored_workflow=stored, + missing_tools=missing_tool_tups + ) diff -r e89cb5a78e209b8a9c1209603a357b10194e377e -r 448c66ad21e012c7b18f1edfd7729527dc8cd507 lib/galaxy/web/base/controller.py --- a/lib/galaxy/web/base/controller.py +++ b/lib/galaxy/web/base/controller.py @@ -38,14 +38,13 @@ from galaxy.managers import api_keys from galaxy.managers import tags +from galaxy.managers import workflows from galaxy.managers import base as managers_base from galaxy.datatypes.metadata import FileParameter from galaxy.tools.parameters import visit_input_values from galaxy.tools.parameters.basic import DataToolParameter from galaxy.tools.parameters.basic import DataCollectionToolParameter -from galaxy.util.json import dumps from galaxy.workflow.modules import ToolModule -from galaxy.workflow.steps import attach_ordered_steps from galaxy.util import validation @@ -1576,110 +1575,15 @@ """ Creates a workflow from a dict. Created workflow is stored in the database and returned. """ - - # Put parameters in workflow mode - trans.workflow_building_mode = True - # Create new workflow from incoming dict - workflow = model.Workflow() - # If there's a source, put it in the workflow name. - if source: - name = "%s (imported from %s)" % ( data['name'], source ) - else: - name = data['name'] - workflow.name = name - if 'uuid' in data: - workflow.uuid = data['uuid'] - # Assume no errors until we find a step that has some - workflow.has_errors = False - # Create each step - steps = [] - # The editor will provide ids for each step that we don't need to save, - # but do need to use to make connections - steps_by_external_id = {} - # Keep track of tools required by the workflow that are not available in - # the local Galaxy instance. Each tuple in the list of missing_tool_tups - # will be ( tool_id, tool_name, tool_version ). - missing_tool_tups = [] - supplied_steps = data[ 'steps' ] - # Try to iterate through imported workflow in such a way as to - # preserve step order. - step_indices = supplied_steps.keys() - try: - step_indices = sorted( step_indices, key=int ) - except ValueError: - # to defensive, were these ever or will they ever not be integers? - pass - # First pass to build step objects and populate basic values - for step_index in step_indices: - step_dict = supplied_steps[ step_index ] - # Create the model class for the step - step = model.WorkflowStep() - steps.append( step ) - steps_by_external_id[ step_dict['id' ] ] = step - # FIXME: Position should be handled inside module - step.position = step_dict['position'] - module = module_factory.from_dict( trans, step_dict, secure=False ) - module.save_to_step( step ) - if module.type == 'tool' and module.tool is None: - # A required tool is not available in the local Galaxy instance. - missing_tool_tup = ( step_dict[ 'tool_id' ], step_dict[ 'name' ], step_dict[ 'tool_version' ] ) - if missing_tool_tup not in missing_tool_tups: - missing_tool_tups.append( missing_tool_tup ) - # Save the entire step_dict in the unused config field, be parsed later - # when we do have the tool - step.config = dumps(step_dict) - if step.tool_errors: - workflow.has_errors = True - # Stick this in the step temporarily - step.temp_input_connections = step_dict['input_connections'] - # Save step annotation. - annotation = step_dict[ 'annotation' ] - if annotation: - annotation = sanitize_html( annotation, 'utf-8', 'text/html' ) - self.add_item_annotation( trans.sa_session, trans.get_user(), step, annotation ) - # Second pass to deal with connections between steps - for step in steps: - # Input connections - for input_name, conn_list in step.temp_input_connections.iteritems(): - if not conn_list: - continue - if not isinstance(conn_list, list): # Older style singleton connection - conn_list = [conn_list] - for conn_dict in conn_list: - conn = model.WorkflowStepConnection() - conn.input_step = step - conn.input_name = input_name - conn.output_name = conn_dict['output_name'] - conn.output_step = steps_by_external_id[ conn_dict['id'] ] - del step.temp_input_connections - - # Order the steps if possible - attach_ordered_steps( workflow, steps ) - - # Connect up - stored = model.StoredWorkflow() - stored.name = workflow.name - workflow.stored_workflow = stored - stored.latest_workflow = workflow - stored.user = trans.user - stored.published = publish - if data[ 'annotation' ]: - annotation = sanitize_html( data[ 'annotation' ], 'utf-8', 'text/html' ) - self.add_item_annotation( trans.sa_session, stored.user, stored, annotation ) - - # Persist - trans.sa_session.add( stored ) - trans.sa_session.flush() - - if add_to_menu: - if trans.user.stored_workflow_menu_entries == None: - trans.user.stored_workflow_menu_entries = [] - menuEntry = model.StoredWorkflowMenuEntry() - menuEntry.stored_workflow = stored - trans.user.stored_workflow_menu_entries.append( menuEntry ) - trans.sa_session.flush() - - return stored, missing_tool_tups + workflow_contents_manager = workflows.WorkflowContentsManager() + created_workflow = workflow_contents_manager.build_workflow_from_dict( + trans, + data, + source=source, + add_to_menu=add_to_menu, + publish=publish + ) + return created_workflow.stored_workflow, created_workflow.missing_tools def _workflow_to_dict( self, trans, stored ): """ https://bitbucket.org/galaxy/galaxy-central/commits/a81fd56c879d/ Changeset: a81fd56c879d User: jmchilton Date: 2014-12-16 03:20:11+00:00 Summary: Refactor updating workflow contents out of mixin into manager. Introduce an API endpoint for this operation with test case. Affected #: 5 files diff -r 448c66ad21e012c7b18f1edfd7729527dc8cd507 -r a81fd56c879dc517dd61e6bcd9bae0b9dd5bca7b lib/galaxy/managers/workflows.py --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -11,7 +11,7 @@ # For WorkflowContentManager from galaxy.util.sanitize_html import sanitize_html from galaxy.workflow.steps import attach_ordered_steps -from galaxy.workflow.modules import module_factory +from galaxy.workflow.modules import module_factory, is_tool_module_type class WorkflowsManager( object ): @@ -235,3 +235,86 @@ stored_workflow=stored, missing_tools=missing_tool_tups ) + + def update_workflow_from_dict(self, trans, stored_workflow, workflow_data, from_editor=False): + # Put parameters in workflow mode + trans.workflow_building_mode = True + # Convert incoming workflow data from json if coming from editor + data = json.loads(workflow_data) if from_editor else workflow_data + # Create new workflow from incoming data + workflow = model.Workflow() + # Just keep the last name (user can rename later) + workflow.name = stored_workflow.name + # Assume no errors until we find a step that has some + workflow.has_errors = False + # Create each step + steps = [] + # The editor will provide ids for each step that we don't need to save, + # but do need to use to make connections + steps_by_external_id = {} + errors = [] + for key, step_dict in data['steps'].iteritems(): + is_tool = is_tool_module_type( step_dict[ 'type' ] ) + if is_tool and step_dict['tool_id'] not in trans.app.toolbox.tools_by_id: + errors.append("Step %s requires tool '%s'." % (step_dict['id'], step_dict['tool_id'])) + if errors: + raise MissingToolsException(workflow, errors) + + # First pass to build step objects and populate basic values + for key, step_dict in data['steps'].iteritems(): + # Create the model class for the step + step = model.WorkflowStep() + steps.append( step ) + steps_by_external_id[ step_dict['id' ] ] = step + # FIXME: Position should be handled inside module + step.position = step_dict['position'] + module = module_factory.from_dict( trans, step_dict, secure=from_editor ) + module.save_to_step( step ) + if 'workflow_outputs' in step_dict: + for output_name in step_dict['workflow_outputs']: + m = model.WorkflowOutput(workflow_step=step, output_name=output_name) + trans.sa_session.add(m) + if step.tool_errors: + # DBTODO Check for conditional inputs here. + workflow.has_errors = True + # Stick this in the step temporarily + step.temp_input_connections = step_dict['input_connections'] + # Save step annotation. + annotation = step_dict[ 'annotation' ] + if annotation: + annotation = sanitize_html( annotation, 'utf-8', 'text/html' ) + self.add_item_annotation( trans.sa_session, trans.get_user(), step, annotation ) + # Second pass to deal with connections between steps + for step in steps: + # Input connections + for input_name, conns in step.temp_input_connections.iteritems(): + if conns: + conn_dicts = conns if isinstance(conns, list) else [ conns ] + for conn_dict in conn_dicts: + conn = model.WorkflowStepConnection() + conn.input_step = step + conn.input_name = input_name + conn.output_name = conn_dict['output_name'] + conn.output_step = steps_by_external_id[ conn_dict['id'] ] + del step.temp_input_connections + # Order the steps if possible + attach_ordered_steps( workflow, steps ) + # Connect up + workflow.stored_workflow = stored_workflow + stored_workflow.latest_workflow = workflow + # Persist + trans.sa_session.flush() + # Return something informative + errors = [] + if workflow.has_errors: + errors.append( "Some steps in this workflow have validation errors" ) + if workflow.has_cycles: + errors.append( "This workflow contains cycles" ) + return workflow, errors + + +class MissingToolsException(object): + + def __init__(self, workflow, errors): + self.workflow = workflow + self.errors = errors diff -r 448c66ad21e012c7b18f1edfd7729527dc8cd507 -r a81fd56c879dc517dd61e6bcd9bae0b9dd5bca7b lib/galaxy/web/base/controller.py --- a/lib/galaxy/web/base/controller.py +++ b/lib/galaxy/web/base/controller.py @@ -1575,6 +1575,7 @@ """ Creates a workflow from a dict. Created workflow is stored in the database and returned. """ + # TODO: replace this method with direct access to manager. workflow_contents_manager = workflows.WorkflowContentsManager() created_workflow = workflow_contents_manager.build_workflow_from_dict( trans, diff -r 448c66ad21e012c7b18f1edfd7729527dc8cd507 -r a81fd56c879dc517dd61e6bcd9bae0b9dd5bca7b lib/galaxy/webapps/galaxy/api/workflows.py --- a/lib/galaxy/webapps/galaxy/api/workflows.py +++ b/lib/galaxy/webapps/galaxy/api/workflows.py @@ -271,6 +271,38 @@ """ return self.__api_import_new_workflow( trans, payload, **kwd ) + @expose_api + def update( self, trans, id, payload, **kwds ): + """ + * PUT /api/workflows/{id} + updates the workflow stored with ``id`` + + :type id: str + :param id: the encoded id of the workflow to update + :type payload: dict + :param payload: a dictionary containing any or all the + * workflow the json description of the workflow as would be + produced by GET workflows/<id>/download or + given to `POST workflows` + + The workflow contents will be updated to target + this. + :rtype: dict + :returns: serialized version of the workflow + """ + stored_workflow = self.__get_stored_workflow( trans, id ) + if 'workflow' in payload: + workflow_contents_manager = workflows.WorkflowContentsManager() + workflow, errors = workflow_contents_manager.update_workflow_from_dict( + trans, + stored_workflow, + payload['workflow'], + ) + else: + message = "Updating workflow requires dictionary containing 'workflow' attribute with new JSON description." + raise exceptions.RequestParameterInvalidException( message ) + return self.__encode_workflow( trans, stored_workflow, workflow ) + def __api_import_new_workflow( self, trans, payload, **kwd ): data = payload['workflow'] diff -r 448c66ad21e012c7b18f1edfd7729527dc8cd507 -r a81fd56c879dc517dd61e6bcd9bae0b9dd5bca7b lib/galaxy/webapps/galaxy/controllers/workflow.py --- a/lib/galaxy/webapps/galaxy/controllers/workflow.py +++ b/lib/galaxy/webapps/galaxy/controllers/workflow.py @@ -22,7 +22,8 @@ from galaxy.web import error, url_for from galaxy.web.base.controller import BaseUIController, SharableMixin, UsesStoredWorkflowMixin from galaxy.web.framework.formbuilder import form -from galaxy.web.framework.helpers import grids, time_ago, to_unicode +from galaxy.web.framework.helpers import grids, time_ago, to_unicode, escape +from galaxy.managers import workflows from galaxy.workflow.modules import WorkflowModuleInjector from galaxy.workflow.modules import MissingToolException from galaxy.workflow.modules import module_factory, is_tool_module_type @@ -796,76 +797,21 @@ """ # Get the stored workflow stored = self.get_stored_workflow( trans, id ) - # Put parameters in workflow mode - trans.workflow_building_mode = True - # Convert incoming workflow data from json - data = json.loads( workflow_data ) - # Create new workflow from incoming data - workflow = model.Workflow() - # Just keep the last name (user can rename later) - workflow.name = stored.name - # Assume no errors until we find a step that has some - workflow.has_errors = False - # Create each step - steps = [] - # The editor will provide ids for each step that we don't need to save, - # but do need to use to make connections - steps_by_external_id = {} - errors = [] - for key, step_dict in data['steps'].iteritems(): - is_tool = is_tool_module_type( step_dict[ 'type' ] ) - if is_tool and step_dict['tool_id'] not in trans.app.toolbox.tools_by_id: - errors.append("Step %s requires tool '%s'." % (step_dict['id'], step_dict['tool_id'])) - if errors: - return dict( name=workflow.name, - message="This workflow includes missing or invalid tools. It cannot be saved until the following steps are removed or the missing tools are enabled.", - errors=errors ) - # First pass to build step objects and populate basic values - for key, step_dict in data['steps'].iteritems(): - # Create the model class for the step - step = model.WorkflowStep() - steps.append( step ) - steps_by_external_id[ step_dict['id' ] ] = step - # FIXME: Position should be handled inside module - step.position = step_dict['position'] - module = module_factory.from_dict( trans, step_dict ) - module.save_to_step( step ) - if 'workflow_outputs' in step_dict: - for output_name in step_dict['workflow_outputs']: - m = model.WorkflowOutput(workflow_step=step, output_name=output_name) - trans.sa_session.add(m) - if step.tool_errors: - # DBTODO Check for conditional inputs here. - workflow.has_errors = True - # Stick this in the step temporarily - step.temp_input_connections = step_dict['input_connections'] - # Save step annotation. - annotation = step_dict[ 'annotation' ] - if annotation: - annotation = sanitize_html( annotation, 'utf-8', 'text/html' ) - self.add_item_annotation( trans.sa_session, trans.get_user(), step, annotation ) - # Second pass to deal with connections between steps - for step in steps: - # Input connections - for input_name, conns in step.temp_input_connections.iteritems(): - if conns: - conn_dicts = conns if isinstance(conns, list) else [ conns ] - for conn_dict in conn_dicts: - conn = model.WorkflowStepConnection() - conn.input_step = step - conn.input_name = input_name - conn.output_name = conn_dict['output_name'] - conn.output_step = steps_by_external_id[ conn_dict['id'] ] - del step.temp_input_connections - # Order the steps if possible - attach_ordered_steps( workflow, steps ) - # Connect up - workflow.stored_workflow = stored - stored.latest_workflow = workflow - # Persist - trans.sa_session.flush() - # Return something informative - errors = [] + workflow_contents_manager = workflows.WorkflowContentsManager() + try: + workflow, errors = workflow_contents_manager.update_workflow_from_dict( + trans, + stored, + workflow_data, + from_editor=True, + ) + except workflows.MissingToolsException as e: + return dict( + name=e.workflow.name, + message="This workflow includes missing or invalid tools. It cannot be saved until the following steps are removed or the missing tools are enabled.", + errors=e.errors, + ) + if workflow.has_errors: errors.append( "Some steps in this workflow have validation errors" ) if workflow.has_cycles: diff -r 448c66ad21e012c7b18f1edfd7729527dc8cd507 -r a81fd56c879dc517dd61e6bcd9bae0b9dd5bca7b test/api/test_workflows.py --- a/test/api/test_workflows.py +++ b/test/api/test_workflows.py @@ -226,9 +226,9 @@ def test_upload_deprecated( self ): self.__test_upload( use_deprecated_route=True ) - def __test_upload( self, use_deprecated_route ): + def __test_upload( self, use_deprecated_route=False, name="test_import" ): data = dict( - workflow=dumps( self.workflow_populator.load_workflow( name="test_import" ) ), + workflow=dumps( self.workflow_populator.load_workflow( name=name ) ), ) if use_deprecated_route: route = "workflows/upload" @@ -236,7 +236,51 @@ route = "workflows" upload_response = self._post( route, data=data ) self._assert_status_code_is( upload_response, 200 ) - self._assert_user_has_workflow_with_name( "test_import (imported from API)" ) + self._assert_user_has_workflow_with_name( "%s (imported from API)" % name ) + return upload_response + + def test_update( self ): + original_workflow = self.workflow_populator.load_workflow( name="test_import" ) + + upload_response = self.__test_upload( ) + workflow_id = upload_response.json()["id"] + + def update(workflow_object): + data = dict( + workflow=workflow_object + ) + raw_url = 'workflows/%s' % workflow_id + url = self._api_url( raw_url, use_key=True ) + put_response = put( url, data=dumps(data) ) + self._assert_status_code_is( put_response, 200 ) + return put_response + + workflow_content = self._download_workflow(workflow_id) + steps = workflow_content["steps"] + + def tweak_step(step): + assert step['position']['top'] != 1 + assert step['position']['left'] != 1 + step['position'] = {'top': 1, 'left': 1} + + map(tweak_step, steps.values()) + + update(workflow_content) + + def check_step(step): + assert step['position']['top'] == 1 + assert step['position']['left'] == 1 + + updated_workflow_content = self._download_workflow(workflow_id) + map(check_step, updated_workflow_content['steps'].values()) + + # Re-update against original worklfow... + update(original_workflow) + + updated_workflow_content = self._download_workflow(workflow_id) + + # Make sure the positions have been updated. + map(tweak_step, updated_workflow_content['steps'].values()) def test_import_deprecated( self ): workflow_id = self.workflow_populator.simple_workflow( "test_import_published_deprecated", publish=True ) https://bitbucket.org/galaxy/galaxy-central/commits/2647ac82f98d/ Changeset: 2647ac82f98d User: jmchilton Date: 2014-12-16 03:20:11+00:00 Summary: Move body of _workflow_to_dict to new WorkflowContentsManager. Getting closer to killing UsesStoredWorkflowMixin. Affected #: 2 files diff -r a81fd56c879dc517dd61e6bcd9bae0b9dd5bca7b -r 2647ac82f98dcd90a133be765122f97d6dede45e lib/galaxy/managers/workflows.py --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -11,7 +11,9 @@ # For WorkflowContentManager from galaxy.util.sanitize_html import sanitize_html from galaxy.workflow.steps import attach_ordered_steps -from galaxy.workflow.modules import module_factory, is_tool_module_type +from galaxy.workflow.modules import module_factory, is_tool_module_type, ToolModule +from galaxy.tools.parameters.basic import DataToolParameter, DataCollectionToolParameter +from galaxy.tools.parameters import visit_input_values class WorkflowsManager( object ): @@ -312,6 +314,108 @@ errors.append( "This workflow contains cycles" ) return workflow, errors + def workflow_to_dict(self, trans, stored, style="export" ): + """ + """ + workflow = stored.latest_workflow + workflow_annotation = self.get_item_annotation_obj( trans.sa_session, trans.user, stored ) + annotation_str = "" + if workflow_annotation: + annotation_str = workflow_annotation.annotation + # Pack workflow data into a dictionary and return + data = {} + data['a_galaxy_workflow'] = 'true' # Placeholder for identifying galaxy workflow + data['format-version'] = "0.1" + data['name'] = workflow.name + data['annotation'] = annotation_str + if workflow.uuid is not None: + data['uuid'] = str(workflow.uuid) + data['steps'] = {} + # For each step, rebuild the form and encode the state + for step in workflow.steps: + # Load from database representation + module = module_factory.from_workflow_step( trans, step ) + if not module: + return None + # Get user annotation. + step_annotation = self.get_item_annotation_obj(trans.sa_session, trans.user, step ) + annotation_str = "" + if step_annotation: + annotation_str = step_annotation.annotation + # Step info + step_dict = { + 'id': step.order_index, + 'type': module.type, + 'tool_id': module.get_tool_id(), + 'tool_version': step.tool_version, + 'name': module.get_name(), + 'tool_state': module.get_state( secure=False ), + 'tool_errors': module.get_errors(), + ## 'data_inputs': module.get_data_inputs(), + ## 'data_outputs': module.get_data_outputs(), + 'annotation': annotation_str + } + # Add post-job actions to step dict. + if module.type == 'tool': + pja_dict = {} + for pja in step.post_job_actions: + pja_dict[pja.action_type + pja.output_name] = dict( action_type=pja.action_type, + output_name=pja.output_name, + action_arguments=pja.action_arguments ) + step_dict[ 'post_job_actions' ] = pja_dict + # Data inputs + step_dict['inputs'] = module.get_runtime_input_dicts( annotation_str ) + # User outputs + step_dict['user_outputs'] = [] + + # All step outputs + step_dict['outputs'] = [] + if type( module ) is ToolModule: + for output in module.get_data_outputs(): + step_dict['outputs'].append( { 'name': output['name'], 'type': output['extensions'][0] } ) + + # Connections + input_connections = step.input_connections + if step.type is None or step.type == 'tool': + # Determine full (prefixed) names of valid input datasets + data_input_names = {} + + def callback( input, value, prefixed_name, prefixed_label ): + if isinstance( input, DataToolParameter ) or isinstance( input, DataCollectionToolParameter ): + data_input_names[ prefixed_name ] = True + # FIXME: this updates modules silently right now; messages from updates should be provided. + module.check_and_update_state() + visit_input_values( module.tool.inputs, module.state.inputs, callback ) + # Filter + # FIXME: this removes connection without displaying a message currently! + input_connections = [ conn for conn in input_connections if conn.input_name in data_input_names ] + + # Encode input connections as dictionary + input_conn_dict = {} + unique_input_names = set( [conn.input_name for conn in input_connections] ) + for input_name in unique_input_names: + input_conn_dict[ input_name ] = \ + [ dict( id=conn.output_step.order_index, output_name=conn.output_name ) for conn in input_connections if conn.input_name == input_name ] + + # Preserve backward compatability. Previously Galaxy + # assumed input connections would be dictionaries not + # lists of dictionaries, so replace any singleton list + # with just the dictionary so that workflows exported from + # newer Galaxy instances can be used with older Galaxy + # instances if they do no include multiple input + # tools. This should be removed at some point. Mirrored + # hack in _workflow_from_dict should never be removed so + # existing workflow exports continue to function. + for input_name, input_conn in dict(input_conn_dict).iteritems(): + if len(input_conn) == 1: + input_conn_dict[input_name] = input_conn[0] + step_dict['input_connections'] = input_conn_dict + # Position + step_dict['position'] = step.position + # Add to return value + data['steps'][step.order_index] = step_dict + return data + class MissingToolsException(object): diff -r a81fd56c879dc517dd61e6bcd9bae0b9dd5bca7b -r 2647ac82f98dcd90a133be765122f97d6dede45e lib/galaxy/web/base/controller.py --- a/lib/galaxy/web/base/controller.py +++ b/lib/galaxy/web/base/controller.py @@ -1590,111 +1590,11 @@ """ Converts a workflow to a dict of attributes suitable for exporting. """ - workflow = stored.latest_workflow - workflow_annotation = self.get_item_annotation_obj( trans.sa_session, trans.user, stored ) - annotation_str = "" - if workflow_annotation: - annotation_str = workflow_annotation.annotation - # Pack workflow data into a dictionary and return - data = {} - data['a_galaxy_workflow'] = 'true' # Placeholder for identifying galaxy workflow - data['format-version'] = "0.1" - data['name'] = workflow.name - data['annotation'] = annotation_str - if workflow.uuid is not None: - data['uuid'] = str(workflow.uuid) - data['steps'] = {} - # For each step, rebuild the form and encode the state - for step in workflow.steps: - # Load from database representation - module = module_factory.from_workflow_step( trans, step ) - if not module: - return None - # Get user annotation. - step_annotation = self.get_item_annotation_obj(trans.sa_session, trans.user, step ) - annotation_str = "" - if step_annotation: - annotation_str = step_annotation.annotation - # Step info - step_dict = { - 'id': step.order_index, - 'type': module.type, - 'tool_id': module.get_tool_id(), - 'tool_version' : step.tool_version, - 'name': module.get_name(), - 'tool_state': module.get_state( secure=False ), - 'tool_errors': module.get_errors(), - ## 'data_inputs': module.get_data_inputs(), - ## 'data_outputs': module.get_data_outputs(), - 'annotation' : annotation_str - } - # Add post-job actions to step dict. - if module.type == 'tool': - pja_dict = {} - for pja in step.post_job_actions: - pja_dict[pja.action_type+pja.output_name] = dict( action_type = pja.action_type, - output_name = pja.output_name, - action_arguments = pja.action_arguments ) - step_dict[ 'post_job_actions' ] = pja_dict - # Data inputs - step_dict['inputs'] = module.get_runtime_input_dicts( annotation_str ) - # User outputs - step_dict['user_outputs'] = [] -# module_outputs = module.get_data_outputs() -# step_outputs = trans.sa_session.query( WorkflowOutput ).filter( step=step ) -# for output in step_outputs: -# name = output.output_name -# annotation = "" -# for module_output in module_outputs: -# if module_output.get( 'name', None ) == name: -# output_type = module_output.get( 'extension', '' ) -# break -# data['outputs'][name] = { 'name' : name, 'annotation' : annotation, 'type' : output_type } - - # All step outputs - step_dict['outputs'] = [] - if type( module ) is ToolModule: - for output in module.get_data_outputs(): - step_dict['outputs'].append( { 'name' : output['name'], 'type' : output['extensions'][0] } ) - # Connections - input_connections = step.input_connections - if step.type is None or step.type == 'tool': - # Determine full (prefixed) names of valid input datasets - data_input_names = {} - - def callback( input, value, prefixed_name, prefixed_label ): - if isinstance( input, DataToolParameter ) or isinstance( input, DataCollectionToolParameter ): - data_input_names[ prefixed_name ] = True - # FIXME: this updates modules silently right now; messages from updates should be provided. - module.check_and_update_state() - visit_input_values( module.tool.inputs, module.state.inputs, callback ) - # Filter - # FIXME: this removes connection without displaying a message currently! - input_connections = [ conn for conn in input_connections if conn.input_name in data_input_names ] - # Encode input connections as dictionary - input_conn_dict = {} - unique_input_names = set( [conn.input_name for conn in input_connections] ) - for input_name in unique_input_names: - input_conn_dict[ input_name ] = \ - [ dict( id=conn.output_step.order_index, output_name=conn.output_name ) for conn in input_connections if conn.input_name == input_name ] - # Preserve backward compatability. Previously Galaxy - # assumed input connections would be dictionaries not - # lists of dictionaries, so replace any singleton list - # with just the dictionary so that workflows exported from - # newer Galaxy instances can be used with older Galaxy - # instances if they do no include multiple input - # tools. This should be removed at some point. Mirrored - # hack in _workflow_from_dict should never be removed so - # existing workflow exports continue to function. - for input_name, input_conn in dict(input_conn_dict).iteritems(): - if len(input_conn) == 1: - input_conn_dict[input_name] = input_conn[0] - step_dict['input_connections'] = input_conn_dict - # Position - step_dict['position'] = step.position - # Add to return value - data['steps'][step.order_index] = step_dict - return data + workflow_contents_manager = workflows.WorkflowContentsManager() + return workflow_contents_manager.workflow_to_dict( + trans, + stored, + ) class UsesFormDefinitionsMixin: https://bitbucket.org/galaxy/galaxy-central/commits/b05becef6c64/ Changeset: b05becef6c64 User: jmchilton Date: 2014-12-16 03:20:11+00:00 Summary: Move more workflow logic out of controller into manager. This logic for building up editor representation of the workflow. Introduce concept of a workflow to dict style - with 'export' and 'editor' as first cracks. Affected #: 4 files diff -r 2647ac82f98dcd90a133be765122f97d6dede45e -r b05becef6c64a4eb63bdc814a21243f2f5af799c lib/galaxy/managers/workflows.py --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -14,6 +14,7 @@ from galaxy.workflow.modules import module_factory, is_tool_module_type, ToolModule from galaxy.tools.parameters.basic import DataToolParameter, DataCollectionToolParameter from galaxy.tools.parameters import visit_input_values +from galaxy.web import url_for class WorkflowsManager( object ): @@ -314,8 +315,143 @@ errors.append( "This workflow contains cycles" ) return workflow, errors - def workflow_to_dict(self, trans, stored, style="export" ): + def workflow_to_dict( self, trans, stored, style="export" ): + """ Export the workflow contents to a dictionary ready for JSON-ification and to be + sent out via API for instance. There are two styles of export allowed 'editor' and 'export'. + The Galaxy team will do it best to preserve the backward compatibility of the 'export' stye - + but the 'editor' style is subject to rapid and unannounced changes. """ + if style == "editor": + return self._workflow_to_dict_editor( trans, stored ) + else: + return self._workflow_to_dict_export( trans, stored ) + + def _workflow_to_dict_editor(self, trans, stored): + """ + """ + workflow = stored.latest_workflow + # Pack workflow data into a dictionary and return + data = {} + data['name'] = workflow.name + data['steps'] = {} + data['upgrade_messages'] = {} + # For each step, rebuild the form and encode the state + for step in workflow.steps: + # Load from database representation + module = module_factory.from_workflow_step( trans, step ) + if not module: + step_annotation = self.get_item_annotation_obj( trans.sa_session, trans.user, step ) + annotation_str = "" + if step_annotation: + annotation_str = step_annotation.annotation + invalid_tool_form_html = """<div class="toolForm tool-node-error"><div class="toolFormTitle form-row-error">Unrecognized Tool: %s</div><div class="toolFormBody"><div class="form-row"> + The tool id '%s' for this tool is unrecognized.<br/><br/>To save this workflow, you will need to delete this step or enable the tool. + </div></div></div>""" % (step.tool_id, step.tool_id) + step_dict = { + 'id': step.order_index, + 'type': 'invalid', + 'tool_id': step.tool_id, + 'name': 'Unrecognized Tool: %s' % step.tool_id, + 'tool_state': None, + 'tooltip': None, + 'tool_errors': ["Unrecognized Tool Id: %s" % step.tool_id], + 'data_inputs': [], + 'data_outputs': [], + 'form_html': invalid_tool_form_html, + 'annotation': annotation_str, + 'input_connections': {}, + 'post_job_actions': {}, + 'workflow_outputs': [] + } + # Position + step_dict['position'] = step.position + # Add to return value + data['steps'][step.order_index] = step_dict + continue + # Fix any missing parameters + upgrade_message = module.check_and_update_state() + if upgrade_message: + # FIXME: Frontend should be able to handle workflow messages + # as a dictionary not just the values + data['upgrade_messages'][step.order_index] = upgrade_message.values() + # Get user annotation. + step_annotation = self.get_item_annotation_obj( trans.sa_session, trans.user, step ) + annotation_str = "" + if step_annotation: + annotation_str = step_annotation.annotation + # Pack attributes into plain dictionary + step_dict = { + 'id': step.order_index, + 'type': module.type, + 'tool_id': module.get_tool_id(), + 'name': module.get_name(), + 'tool_state': module.get_state(), + 'tooltip': module.get_tooltip( static_path=url_for( '/static' ) ), + 'tool_errors': module.get_errors(), + 'data_inputs': module.get_data_inputs(), + 'data_outputs': module.get_data_outputs(), + 'form_html': module.get_config_form(), + 'annotation': annotation_str, + 'post_job_actions': {}, + 'workflow_outputs': [] + } + # Connections + input_connections = step.input_connections + input_connections_type = {} + multiple_input = {} # Boolean value indicating if this can be mutliple + if step.type is None or step.type == 'tool': + # Determine full (prefixed) names of valid input datasets + data_input_names = {} + + def callback( input, value, prefixed_name, prefixed_label ): + if isinstance( input, DataToolParameter ) or isinstance( input, DataCollectionToolParameter ): + data_input_names[ prefixed_name ] = True + multiple_input[ prefixed_name ] = input.multiple + if isinstance( input, DataToolParameter ): + input_connections_type[ input.name ] = "dataset" + if isinstance( input, DataCollectionToolParameter ): + input_connections_type[ input.name ] = "dataset_collection" + visit_input_values( module.tool.inputs, module.state.inputs, callback ) + # Filter + # FIXME: this removes connection without displaying a message currently! + input_connections = [ conn for conn in input_connections if conn.input_name in data_input_names ] + # post_job_actions + pja_dict = {} + for pja in step.post_job_actions: + pja_dict[pja.action_type + pja.output_name] = dict( + action_type=pja.action_type, + output_name=pja.output_name, + action_arguments=pja.action_arguments + ) + step_dict['post_job_actions'] = pja_dict + #workflow outputs + outputs = [] + for output in step.workflow_outputs: + outputs.append(output.output_name) + step_dict['workflow_outputs'] = outputs + # Encode input connections as dictionary + input_conn_dict = {} + for conn in input_connections: + input_type = "dataset" + if conn.input_name in input_connections_type: + input_type = input_connections_type[ conn.input_name ] + conn_dict = dict( id=conn.output_step.order_index, output_name=conn.output_name, input_type=input_type ) + if conn.input_name in multiple_input: + if conn.input_name in input_conn_dict: + input_conn_dict[ conn.input_name ].append( conn_dict ) + else: + input_conn_dict[ conn.input_name ] = [ conn_dict ] + else: + input_conn_dict[ conn.input_name ] = conn_dict + step_dict['input_connections'] = input_conn_dict + # Position + step_dict['position'] = step.position + # Add to return value + data['steps'][step.order_index] = step_dict + return data + + def _workflow_to_dict_export( self, trans, stored ): + """ Export the workflow contents to a dictionary ready for JSON-ification and export. """ workflow = stored.latest_workflow workflow_annotation = self.get_item_annotation_obj( trans.sa_session, trans.user, stored ) diff -r 2647ac82f98dcd90a133be765122f97d6dede45e -r b05becef6c64a4eb63bdc814a21243f2f5af799c lib/galaxy/webapps/galaxy/api/workflows.py --- a/lib/galaxy/webapps/galaxy/api/workflows.py +++ b/lib/galaxy/webapps/galaxy/api/workflows.py @@ -28,6 +28,7 @@ super( BaseAPIController, self ).__init__( app ) self.history_manager = histories.HistoryManager() self.workflow_manager = workflows.WorkflowsManager( app ) + self.workflow_contents_manager = workflows.WorkflowContentsManager() @expose_api def index(self, trans, **kwd): @@ -221,7 +222,8 @@ """ stored_workflow = self.__get_stored_accessible_workflow( trans, workflow_id ) - ret_dict = self._workflow_to_dict( trans, stored_workflow ) + style = kwd.get("style", "export") + ret_dict = self.workflow_contents_manager.workflow_to_dict( trans, stored_workflow, style=style ) if not ret_dict: # This workflow has a tool that's missing from the distribution message = "Workflow cannot be exported due to missing tools." diff -r 2647ac82f98dcd90a133be765122f97d6dede45e -r b05becef6c64a4eb63bdc814a21243f2f5af799c lib/galaxy/webapps/galaxy/controllers/workflow.py --- a/lib/galaxy/webapps/galaxy/controllers/workflow.py +++ b/lib/galaxy/webapps/galaxy/controllers/workflow.py @@ -669,126 +669,8 @@ """ trans.workflow_building_mode = True stored = self.get_stored_workflow( trans, id, check_ownership=True, check_accessible=False ) - workflow = stored.latest_workflow - # Pack workflow data into a dictionary and return - data = {} - data['name'] = workflow.name - data['steps'] = {} - data['upgrade_messages'] = {} - # For each step, rebuild the form and encode the state - for step in workflow.steps: - # Load from database representation - module = module_factory.from_workflow_step( trans, step ) - if not module: - step_annotation = self.get_item_annotation_obj( trans.sa_session, trans.user, step ) - annotation_str = "" - if step_annotation: - annotation_str = step_annotation.annotation - invalid_tool_form_html = """<div class="toolForm tool-node-error"><div class="toolFormTitle form-row-error">Unrecognized Tool: %s</div><div class="toolFormBody"><div class="form-row"> - The tool id '%s' for this tool is unrecognized.<br/><br/>To save this workflow, you will need to delete this step or enable the tool. - </div></div></div>""" % (step.tool_id, step.tool_id) - step_dict = { - 'id': step.order_index, - 'type': 'invalid', - 'tool_id': step.tool_id, - 'name': 'Unrecognized Tool: %s' % step.tool_id, - 'tool_state': None, - 'tooltip': None, - 'tool_errors': ["Unrecognized Tool Id: %s" % step.tool_id], - 'data_inputs': [], - 'data_outputs': [], - 'form_html': invalid_tool_form_html, - 'annotation': annotation_str, - 'input_connections': {}, - 'post_job_actions': {}, - 'workflow_outputs': [] - } - # Position - step_dict['position'] = step.position - # Add to return value - data['steps'][step.order_index] = step_dict - continue - # Fix any missing parameters - upgrade_message = module.check_and_update_state() - if upgrade_message: - # FIXME: Frontend should be able to handle workflow messages - # as a dictionary not just the values - data['upgrade_messages'][step.order_index] = upgrade_message.values() - # Get user annotation. - step_annotation = self.get_item_annotation_obj( trans.sa_session, trans.user, step ) - annotation_str = "" - if step_annotation: - annotation_str = step_annotation.annotation - # Pack attributes into plain dictionary - step_dict = { - 'id': step.order_index, - 'type': module.type, - 'tool_id': module.get_tool_id(), - 'name': module.get_name(), - 'tool_state': module.get_state(), - 'tooltip': module.get_tooltip( static_path=url_for( '/static' ) ), - 'tool_errors': module.get_errors(), - 'data_inputs': module.get_data_inputs(), - 'data_outputs': module.get_data_outputs(), - 'form_html': module.get_config_form(), - 'annotation': annotation_str, - 'post_job_actions': {}, - 'workflow_outputs': [] - } - # Connections - input_connections = step.input_connections - input_connections_type = {} - multiple_input = {} # Boolean value indicating if this can be mutliple - if step.type is None or step.type == 'tool': - # Determine full (prefixed) names of valid input datasets - data_input_names = {} - - def callback( input, value, prefixed_name, prefixed_label ): - if isinstance( input, DataToolParameter ) or isinstance( input, DataCollectionToolParameter ): - data_input_names[ prefixed_name ] = True - multiple_input[ prefixed_name ] = input.multiple - if isinstance( input, DataToolParameter ): - input_connections_type[ input.name ] = "dataset" - if isinstance( input, DataCollectionToolParameter ): - input_connections_type[ input.name ] = "dataset_collection" - visit_input_values( module.tool.inputs, module.state.inputs, callback ) - # Filter - # FIXME: this removes connection without displaying a message currently! - input_connections = [ conn for conn in input_connections if conn.input_name in data_input_names ] - # post_job_actions - pja_dict = {} - for pja in step.post_job_actions: - pja_dict[pja.action_type + pja.output_name] = dict( - action_type=pja.action_type, - output_name=pja.output_name, - action_arguments=pja.action_arguments - ) - step_dict['post_job_actions'] = pja_dict - #workflow outputs - outputs = [] - for output in step.workflow_outputs: - outputs.append(output.output_name) - step_dict['workflow_outputs'] = outputs - # Encode input connections as dictionary - input_conn_dict = {} - for conn in input_connections: - input_type = "dataset" - if conn.input_name in input_connections_type: - input_type = input_connections_type[ conn.input_name ] - conn_dict = dict( id=conn.output_step.order_index, output_name=conn.output_name, input_type=input_type ) - if conn.input_name in multiple_input: - if conn.input_name in input_conn_dict: - input_conn_dict[ conn.input_name ].append( conn_dict ) - else: - input_conn_dict[ conn.input_name ] = [ conn_dict ] - else: - input_conn_dict[ conn.input_name ] = conn_dict - step_dict['input_connections'] = input_conn_dict - # Position - step_dict['position'] = step.position - # Add to return value - data['steps'][step.order_index] = step_dict - return data + workflow_contents_manager = workflows.WorkflowContentsManager() + return workflow_contents_manager.workflow_to_dict( trans, stored, style="editor" ) @web.json def save_workflow( self, trans, id, workflow_data ): diff -r 2647ac82f98dcd90a133be765122f97d6dede45e -r b05becef6c64a4eb63bdc814a21243f2f5af799c test/api/test_workflows.py --- a/test/api/test_workflows.py +++ b/test/api/test_workflows.py @@ -324,6 +324,46 @@ first_input = downloaded_workflow[ "steps" ][ "0" ][ "inputs" ][ 0 ] assert first_input[ "name" ] == "WorkflowInput1" assert first_input[ "description" ] == "input1 description" + self._assert_has_keys( downloaded_workflow, "a_galaxy_workflow", "format-version", "annotation", "uuid", "steps" ) + for step in downloaded_workflow["steps"].values(): + self._assert_has_keys( + step, + 'id', + 'type', + 'tool_id', + 'tool_version', + 'name', + 'tool_state', + 'tool_errors', + 'annotation', + 'inputs', + 'user_outputs', + 'outputs' + ) + if step['type'] == "tool": + self._assert_has_keys( step, "post_job_actions" ) + + def test_export_editor( self ): + uploaded_workflow_id = self.workflow_populator.simple_workflow( "test_for_export" ) + downloaded_workflow = self._download_workflow( uploaded_workflow_id, style="editor" ) + self._assert_has_keys( downloaded_workflow, "name", "steps", "upgrade_messages" ) + for step in downloaded_workflow["steps"].values(): + self._assert_has_keys( + step, + 'id', + 'type', + 'tool_id', + 'name', + 'tool_state', + 'tooltip', + 'tool_errors', + 'data_inputs', + 'data_outputs', + 'form_html', + 'annotation', + 'post_job_actions', + 'workflow_outputs', + ) def test_import_export_with_runtime_inputs( self ): workflow = self.workflow_populator.load_workflow_from_resource( name="test_workflow_with_runtime_input" ) @@ -771,8 +811,11 @@ ) return self._post( route, import_data ) - def _download_workflow(self, workflow_id): - download_response = self._get( "workflows/%s/download" % workflow_id ) + def _download_workflow(self, workflow_id, style=None): + params = {} + if style: + params = {"style": style} + download_response = self._get( "workflows/%s/download" % workflow_id, params ) self._assert_status_code_is( download_response, 200 ) downloaded_workflow = download_response.json() return downloaded_workflow https://bitbucket.org/galaxy/galaxy-central/commits/3e807234d175/ Changeset: 3e807234d175 User: jmchilton Date: 2014-12-16 03:20:11+00:00 Summary: Introduce yet another workflow to_dict style - 'instance'. This is the style used by workflow 'show' - this consolidates all step handling to_dict style-logic from controller/base.py, controller/workflow.py, and api/workflows.py into the new WorkflowContentsManager - with API test cases for everything. Affected #: 2 files diff -r b05becef6c64a4eb63bdc814a21243f2f5af799c -r 3e807234d17565ef13a6ade7157af87c690c630b lib/galaxy/managers/workflows.py --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -317,12 +317,17 @@ def workflow_to_dict( self, trans, stored, style="export" ): """ Export the workflow contents to a dictionary ready for JSON-ification and to be - sent out via API for instance. There are two styles of export allowed 'editor' and 'export'. - The Galaxy team will do it best to preserve the backward compatibility of the 'export' stye - - but the 'editor' style is subject to rapid and unannounced changes. + sent out via API for instance. There are three styles of export allowed 'export', 'instance', and + 'editor'. The Galaxy team will do it best to preserve the backward compatibility of the + 'export' stye - this is the export method meant to be portable across Galaxy instances and over + time. The 'editor' style is subject to rapid and unannounced changes. The 'instance' export + option describes the workflow in a context more tied to the current Galaxy instance and includes + fields like 'url' and 'url' and actual unencoded step ids instead of 'order_index'. """ if style == "editor": return self._workflow_to_dict_editor( trans, stored ) + elif style == "instance": + return self._workflow_to_dict_instance( trans, stored ) else: return self._workflow_to_dict_export( trans, stored ) @@ -552,6 +557,45 @@ data['steps'][step.order_index] = step_dict return data + def _workflow_to_dict_instance(self, trans, stored): + item = stored.to_dict( view='element', value_mapper={ 'id': trans.security.encode_id } ) + workflow = stored.latest_workflow + item['url'] = url_for('workflow', id=item['id']) + item['owner'] = stored.user.username + inputs = {} + for step in workflow.steps: + step_type = step.type + if step_type in ['data_input', 'data_collection_input']: + if step.tool_inputs and "name" in step.tool_inputs: + label = step.tool_inputs['name'] + elif step_type == "data_input": + label = "Input Dataset" + elif step_type == "data_collection_input": + label = "Input Dataset Collection" + else: + raise ValueError("Invalid step_type %s" % step_type) + inputs[step.id] = {'label': label, 'value': ""} + else: + pass + # Eventually, allow regular tool parameters to be inserted and modified at runtime. + # p = step.get_required_parameters() + item['inputs'] = inputs + item['annotation'] = self.get_item_annotation_str( trans.sa_session, stored.user, stored ) + steps = {} + for step in workflow.steps: + steps[step.id] = {'id': step.id, + 'type': step.type, + 'tool_id': step.tool_id, + 'tool_version': step.tool_version, + 'annotation': self.get_item_annotation_str( trans.sa_session, stored.user, step ), + 'tool_inputs': step.tool_inputs, + 'input_steps': {}} + for conn in step.input_connections: + steps[step.id]['input_steps'][conn.input_name] = {'source_step': conn.output_step_id, + 'step_output': conn.output_name} + item['steps'] = steps + return item + class MissingToolsException(object): diff -r b05becef6c64a4eb63bdc814a21243f2f5af799c -r 3e807234d17565ef13a6ade7157af87c690c630b lib/galaxy/webapps/galaxy/api/workflows.py --- a/lib/galaxy/webapps/galaxy/api/workflows.py +++ b/lib/galaxy/webapps/galaxy/api/workflows.py @@ -76,8 +76,7 @@ if trans.sa_session.query(trans.app.model.StoredWorkflowUserShareAssociation).filter_by(user=trans.user, stored_workflow=stored_workflow).count() == 0: message = "Workflow is neither importable, nor owned by or shared with current user" raise exceptions.ItemAccessibilityException( message ) - latest_workflow = stored_workflow.latest_workflow - return self.__encode_workflow( trans, stored_workflow, latest_workflow ) + return self.workflow_contents_manager.workflow_to_dict( trans, stored_workflow, style="instance" ) @expose_api def create(self, trans, payload, **kwd): @@ -303,7 +302,7 @@ else: message = "Updating workflow requires dictionary containing 'workflow' attribute with new JSON description." raise exceptions.RequestParameterInvalidException( message ) - return self.__encode_workflow( trans, stored_workflow, workflow ) + return self.workflow_contents_manager.workflow_to_dict( trans, stored_workflow, style="instance" ) def __api_import_new_workflow( self, trans, payload, **kwd ): data = payload['workflow'] @@ -510,44 +509,6 @@ ) return self.__encode_invocation_step( trans, invocation_step ) - def __encode_workflow( self, trans, stored_workflow, workflow ): - item = stored_workflow.to_dict( view='element', value_mapper={ 'id': trans.security.encode_id } ) - item['url'] = url_for('workflow', id=item['id']) - item['owner'] = stored_workflow.user.username - inputs = {} - for step in workflow.steps: - step_type = step.type - if step_type in ['data_input', 'data_collection_input']: - if step.tool_inputs and "name" in step.tool_inputs: - label = step.tool_inputs['name'] - elif step_type == "data_input": - label = "Input Dataset" - elif step_type == "data_collection_input": - label = "Input Dataset Collection" - else: - raise ValueError("Invalid step_type %s" % step_type) - inputs[step.id] = {'label': label, 'value': ""} - else: - pass - # Eventually, allow regular tool parameters to be inserted and modified at runtime. - # p = step.get_required_parameters() - item['inputs'] = inputs - item['annotation'] = self.get_item_annotation_str( trans.sa_session, stored_workflow.user, stored_workflow ) - steps = {} - for step in workflow.steps: - steps[step.id] = {'id': step.id, - 'type': step.type, - 'tool_id': step.tool_id, - 'tool_version': step.tool_version, - 'annotation': self.get_item_annotation_str( trans.sa_session, stored_workflow.user, step ), - 'tool_inputs': step.tool_inputs, - 'input_steps': {}} - for conn in step.input_connections: - steps[step.id]['input_steps'][conn.input_name] = {'source_step': conn.output_step_id, - 'step_output': conn.output_name} - item['steps'] = steps - return item - def __encode_invocation_step( self, trans, invocation_step ): return self.encode_all_ids( trans, https://bitbucket.org/galaxy/galaxy-central/commits/4b542b64f6b2/ Changeset: 4b542b64f6b2 User: jmchilton Date: 2014-12-16 03:20:11+00:00 Summary: Allow implicit connections between workflow steps (no editor GUI yet). This means steps that are not connecting an output of one step to the input of another. This could potentially address all sorts of untraditional (in a Galaxy sense) workflows where some sort of data is managed externally. The most important use I think I have heard discussed is that of data managers - this can be used in cases where data managers depend on one another (grab the fasta files in one step, index them in another) or workflows where a downstream analysis depends on index data populated via data managers in earlier steps. Not really sure how to represent these in the workflow editor - but this is a power user feature anyway so hopefully that is not super pressing. The YAML to workflow DSL supports the operation (see test cases) so these power users (a euphemism for Dan I guess) can just use that for now. Affected #: 6 files diff -r 3e807234d17565ef13a6ade7157af87c690c630b -r 4b542b64f6b2add47fd55d535f06d026f26f7e3c lib/galaxy/managers/workflows.py --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -529,7 +529,7 @@ visit_input_values( module.tool.inputs, module.state.inputs, callback ) # Filter # FIXME: this removes connection without displaying a message currently! - input_connections = [ conn for conn in input_connections if conn.input_name in data_input_names ] + input_connections = [ conn for conn in input_connections if (conn.input_name in data_input_names or conn.non_data_connection) ] # Encode input connections as dictionary input_conn_dict = {} diff -r 3e807234d17565ef13a6ade7157af87c690c630b -r 4b542b64f6b2add47fd55d535f06d026f26f7e3c lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -3077,6 +3077,12 @@ class WorkflowStepConnection( object ): + # Constant used in lieu of output_name and input_name to indicate an + # implicit connection between two steps that is not dependent on a dataset + # or a dataset collection. Allowing for instance data manager steps to setup + # index data before a normal tool runs or for workflows that manage data + # outside of Galaxy. + NON_DATA_CONNECTION = "__NO_INPUT_OUTPUT_NAME__" def __init__( self ): self.output_step_id = None @@ -3084,6 +3090,15 @@ self.input_step_id = None self.input_name = None + def set_non_data_connection(self): + self.output_name = WorkflowStepConnection.NON_DATA_CONNECTION + self.input_name = WorkflowStepConnection.NON_DATA_CONNECTION + + @property + def non_data_connection(self): + return (self.output_name == WorkflowStepConnection.NON_DATA_CONNECTION and + self.input_name == WorkflowStepConnection.NON_DATA_CONNECTION) + class WorkflowOutput(object): @@ -3153,6 +3168,13 @@ step_invocations[ step_id ].append( invocation_step ) return step_invocations + def step_invocations_for_step_id( self, step_id ): + step_invocations = [] + for invocation_step in self.steps: + if step_id == invocation_step.workflow_step_id: + step_invocations.append( invocation_step ) + return step_invocations + @staticmethod def poll_active_workflow_ids( sa_session, diff -r 3e807234d17565ef13a6ade7157af87c690c630b -r 4b542b64f6b2add47fd55d535f06d026f26f7e3c lib/galaxy/workflow/run.py --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -136,6 +136,8 @@ for step in remaining_steps: jobs = None try: + self.__check_implicitly_dependent_steps(step) + jobs = self._invoke_step( step ) for job in (util.listify( jobs ) or [None]): # Record invocation @@ -160,6 +162,38 @@ # invocations. return self.progress.outputs + def __check_implicitly_dependent_steps( self, step ): + """ Method will delay the workflow evaluation if implicitly dependent + steps (steps dependent but not through an input->output way) are not + yet complete. + """ + for input_connection in step.input_connections: + if input_connection.non_data_connection: + output_id = input_connection.output_step.id + self.__check_implicitly_dependent_step( output_id ) + + def __check_implicitly_dependent_step( self, output_id ): + step_invocations = self.workflow_invocation.step_invocations_for_step_id( output_id ) + + # No steps created yet - have to delay evaluation. + if not step_invocations: + raise modules.DelayedWorkflowEvaluation() + + for step_invocation in step_invocations: + job = step_invocation.job + if job: + # At least one job in incomplete. + if not job.finished: + raise modules.DelayedWorkflowEvaluation() + + if job.state != job.states.OK: + raise modules.CancelWorkflowEvaluation() + + else: + # TODO: Handle implicit dependency on stuff like + # pause steps. + pass + def _invoke_step( self, step ): jobs = step.module.execute( self.trans, self.progress, self.workflow_invocation, step ) return jobs diff -r 3e807234d17565ef13a6ade7157af87c690c630b -r 4b542b64f6b2add47fd55d535f06d026f26f7e3c test/api/test_workflows.py --- a/test/api/test_workflows.py +++ b/test/api/test_workflows.py @@ -107,7 +107,7 @@ invocation_details = invocation_details_response.json() return invocation_details - def _run_jobs( self, jobs_yaml, history_id=None ): + def _run_jobs( self, jobs_yaml, history_id=None, wait=True ): if history_id is None: history_id = self.history_id workflow_id = self._upload_yaml_workflow( @@ -153,8 +153,9 @@ invocation_id = invocation[ "id" ] # Wait for workflow to become fully scheduled and then for all jobs # complete. - self.wait_for_invocation( workflow_id, invocation_id ) - self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + if wait: + self.wait_for_invocation( workflow_id, invocation_id ) + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) jobs = self._history_jobs( history_id ) return RunJobsSummary( history_id=history_id, @@ -516,6 +517,60 @@ invocation = self._invocation_details( uploaded_workflow_id, invocation_id ) assert invocation[ 'state' ] == 'cancelled' + def test_run_with_implicit_connection( self ): + history_id = self.dataset_populator.new_history() + run_summary = self._run_jobs(""" +steps: +- label: test_input + type: input +- label: first_cat + tool_id: cat1 + state: + input1: + $link: test_input +- label: the_pause + type: pause + connect: + input: + - first_cat#out_file1 +- label: second_cat + tool_id: cat1 + state: + input1: + $link: the_pause +- label: third_cat + tool_id: random_lines1 + connect: + $step: second_cat + state: + num_lines: 1 + input: + $link: test_input + seed_source: + seed_source_selector: set_seed + seed: asdf + __current_case__: 1 +test_data: + test_input: "hello world" +""", history_id=history_id, wait=False) + time.sleep( 2 ) + history_id = run_summary.history_id + workflow_id = run_summary.workflow_id + invocation_id = run_summary.workflow_id + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + invocation = self._invocation_details( workflow_id, invocation_id ) + assert invocation[ 'state' ] != 'scheduled' + # Expect two jobs - the upload and first cat. randomlines shouldn't run + # it is implicitly dependent on second cat. + assert len( self._history_jobs( history_id ) ) == 2 + + self.__review_paused_steps( workflow_id, invocation_id, order_index=2, action=True ) + self.wait_for_invocation( workflow_id, invocation_id ) + time.sleep(1) + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + time.sleep(1) + assert len( self._history_jobs( history_id ) ) == 4 + def test_cannot_run_inaccessible_workflow( self ): workflow = self.workflow_populator.load_workflow( name="test_for_run_cannot_access" ) workflow_request, history_id = self._setup_workflow_run( workflow ) diff -r 3e807234d17565ef13a6ade7157af87c690c630b -r 4b542b64f6b2add47fd55d535f06d026f26f7e3c test/api/test_workflows_from_yaml.py --- a/test/api/test_workflows_from_yaml.py +++ b/test/api/test_workflows_from_yaml.py @@ -102,3 +102,34 @@ print self._get("workflows/%s/download" % workflow_id).json() assert False # TODO: fill out test... + + def test_implicit_connections( self ): + workflow_id = self._upload_yaml_workflow(""" +- label: test_input + type: input +- label: first_cat + tool_id: cat1 + state: + input1: + $link: test_input +- label: the_pause + type: pause + connect: + input: + - first_cat#out_file1 +- label: second_cat + tool_id: cat1 + state: + input1: + $link: the_pause +- label: third_cat + tool_id: cat1 + connect: + $step: second_cat + state: + input1: + $link: test_input +""") + workflow = self._get("workflows/%s/download" % workflow_id).json() + print workflow + assert False diff -r 3e807234d17565ef13a6ade7157af87c690c630b -r 4b542b64f6b2add47fd55d535f06d026f26f7e3c test/api/yaml_to_workflow.py --- a/test/api/yaml_to_workflow.py +++ b/test/api/yaml_to_workflow.py @@ -262,6 +262,8 @@ values = [ values ] for value in values: if not isinstance(value, dict): + if key == "$step": + value += "#__NO_INPUT_OUTPUT_NAME__" value_parts = str(value).split("#") if len(value_parts) == 1: value_parts.append("output") @@ -270,6 +272,8 @@ id = context.labels[id] value = {"id": int(id), "output_name": value_parts[1]} input_connection_value.append(value) + if key == "$step": + key = "__NO_INPUT_OUTPUT_NAME__" input_connections[key] = input_connection_value https://bitbucket.org/galaxy/galaxy-central/commits/10d20731e6cc/ Changeset: 10d20731e6cc User: jmchilton Date: 2014-12-16 03:20:12+00:00 Summary: Slightly improved error message for incorrect workflow connection uploads. Affected #: 1 file diff -r 4b542b64f6b2add47fd55d535f06d026f26f7e3c -r 10d20731e6ccbe4a48a110d95ffe02a0e04ffcd3 lib/galaxy/managers/workflows.py --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -201,6 +201,10 @@ if not isinstance(conn_list, list): # Older style singleton connection conn_list = [conn_list] for conn_dict in conn_list: + if 'output_name' not in conn_dict or 'id' not in conn_dict: + template = "Invalid connection [%s] - must be dict with output_name and id fields." + message = template % conn_dict + raise exceptions.MessageException(message) conn = model.WorkflowStepConnection() conn.input_step = step conn.input_name = input_name https://bitbucket.org/galaxy/galaxy-central/commits/afed68c51bc2/ Changeset: afed68c51bc2 User: jmchilton Date: 2014-12-16 03:20:12+00:00 Summary: De-duplication of step creation across update and create worklfows. Affected #: 1 file diff -r 10d20731e6ccbe4a48a110d95ffe02a0e04ffcd3 -r afed68c51bc23c80aa99c9379649476ffba4f796 lib/galaxy/managers/workflows.py --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -167,14 +167,11 @@ # First pass to build step objects and populate basic values for step_index in step_indices: step_dict = supplied_steps[ step_index ] - # Create the model class for the step - step = model.WorkflowStep() + + module, step = self.__module_from_dict( trans, step_dict, secure=False ) steps.append( step ) steps_by_external_id[ step_dict['id' ] ] = step - # FIXME: Position should be handled inside module - step.position = step_dict['position'] - module = module_factory.from_dict( trans, step_dict, secure=False ) - module.save_to_step( step ) + if module.type == 'tool' and module.tool is None: # A required tool is not available in the local Galaxy instance. missing_tool_tup = ( step_dict[ 'tool_id' ], step_dict[ 'name' ], step_dict[ 'tool_version' ] ) @@ -185,13 +182,7 @@ step.config = json.dumps(step_dict) if step.tool_errors: workflow.has_errors = True - # Stick this in the step temporarily - step.temp_input_connections = step_dict['input_connections'] - # Save step annotation. - annotation = step_dict[ 'annotation' ] - if annotation: - annotation = sanitize_html( annotation, 'utf-8', 'text/html' ) - self.add_item_annotation( trans.sa_session, trans.get_user(), step, annotation ) + # Second pass to deal with connections between steps for step in steps: # Input connections @@ -269,14 +260,10 @@ # First pass to build step objects and populate basic values for key, step_dict in data['steps'].iteritems(): + module, step = self.__module_from_dict( trans, step_dict, secure=from_editor ) # Create the model class for the step - step = model.WorkflowStep() steps.append( step ) steps_by_external_id[ step_dict['id' ] ] = step - # FIXME: Position should be handled inside module - step.position = step_dict['position'] - module = module_factory.from_dict( trans, step_dict, secure=from_editor ) - module.save_to_step( step ) if 'workflow_outputs' in step_dict: for output_name in step_dict['workflow_outputs']: m = model.WorkflowOutput(workflow_step=step, output_name=output_name) @@ -284,13 +271,7 @@ if step.tool_errors: # DBTODO Check for conditional inputs here. workflow.has_errors = True - # Stick this in the step temporarily - step.temp_input_connections = step_dict['input_connections'] - # Save step annotation. - annotation = step_dict[ 'annotation' ] - if annotation: - annotation = sanitize_html( annotation, 'utf-8', 'text/html' ) - self.add_item_annotation( trans.sa_session, trans.get_user(), step, annotation ) + # Second pass to deal with connections between steps for step in steps: # Input connections @@ -600,6 +581,28 @@ item['steps'] = steps return item + def __module_from_dict( self, trans, step_dict, secure ): + """ Create a WorkflowStep model object and corrsponding module representing + type-specific functionality from the incoming dicitionary. + """ + step = model.WorkflowStep() + + # TODO: Consider handling position inside module. + step.position = step_dict['position'] + + module = module_factory.from_dict( trans, step_dict, secure=secure ) + module.save_to_step( step ) + + annotation = step_dict[ 'annotation' ] + if annotation: + annotation = sanitize_html( annotation, 'utf-8', 'text/html' ) + self.add_item_annotation( trans.sa_session, trans.get_user(), step, annotation ) + + # Stick this in the step temporarily + step.temp_input_connections = step_dict['input_connections'] + + return module, step + class MissingToolsException(object): https://bitbucket.org/galaxy/galaxy-central/commits/f7a77c6b57c7/ Changeset: f7a77c6b57c7 User: jmchilton Date: 2014-12-16 03:20:12+00:00 Summary: De-duplicatation for setting up workflow connections during creation/update. Affected #: 1 file diff -r afed68c51bc23c80aa99c9379649476ffba4f796 -r f7a77c6b57c74750097c3802185716ebd4e912b1 lib/galaxy/managers/workflows.py --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -184,24 +184,7 @@ workflow.has_errors = True # Second pass to deal with connections between steps - for step in steps: - # Input connections - for input_name, conn_list in step.temp_input_connections.iteritems(): - if not conn_list: - continue - if not isinstance(conn_list, list): # Older style singleton connection - conn_list = [conn_list] - for conn_dict in conn_list: - if 'output_name' not in conn_dict or 'id' not in conn_dict: - template = "Invalid connection [%s] - must be dict with output_name and id fields." - message = template % conn_dict - raise exceptions.MessageException(message) - conn = model.WorkflowStepConnection() - conn.input_step = step - conn.input_name = input_name - conn.output_name = conn_dict['output_name'] - conn.output_step = steps_by_external_id[ conn_dict['id'] ] - del step.temp_input_connections + self.__connect_workflow_steps( steps, steps_by_external_id ) # Order the steps if possible attach_ordered_steps( workflow, steps ) @@ -273,18 +256,8 @@ workflow.has_errors = True # Second pass to deal with connections between steps - for step in steps: - # Input connections - for input_name, conns in step.temp_input_connections.iteritems(): - if conns: - conn_dicts = conns if isinstance(conns, list) else [ conns ] - for conn_dict in conn_dicts: - conn = model.WorkflowStepConnection() - conn.input_step = step - conn.input_name = input_name - conn.output_name = conn_dict['output_name'] - conn.output_step = steps_by_external_id[ conn_dict['id'] ] - del step.temp_input_connections + self.__connect_workflow_steps( steps, steps_by_external_id ) + # Order the steps if possible attach_ordered_steps( workflow, steps ) # Connect up @@ -603,6 +576,31 @@ return module, step + def __connect_workflow_steps( self, steps, steps_by_external_id ): + """ Second pass to deal with connections between steps. + + Create workflow connection objects using externally specified ids + using during creation or update. + """ + for step in steps: + # Input connections + for input_name, conn_list in step.temp_input_connections.iteritems(): + if not conn_list: + continue + if not isinstance(conn_list, list): # Older style singleton connection + conn_list = [conn_list] + for conn_dict in conn_list: + if 'output_name' not in conn_dict or 'id' not in conn_dict: + template = "Invalid connection [%s] - must be dict with output_name and id fields." + message = template % conn_dict + raise exceptions.MessageException(message) + conn = model.WorkflowStepConnection() + conn.input_step = step + conn.input_name = input_name + conn.output_name = conn_dict['output_name'] + conn.output_step = steps_by_external_id[ conn_dict['id'] ] + del step.temp_input_connections + class MissingToolsException(object): https://bitbucket.org/galaxy/galaxy-central/commits/f9595a9840c1/ Changeset: f9595a9840c1 User: jmchilton Date: 2014-12-16 03:20:12+00:00 Summary: Create abstraction for walking over step dicts to share between create/update methods. Affected #: 1 file diff -r f7a77c6b57c74750097c3802185716ebd4e912b1 -r f9595a9840c172a85fd8a5335688afb924afaa97 lib/galaxy/managers/workflows.py --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -155,19 +155,7 @@ # the local Galaxy instance. Each tuple in the list of missing_tool_tups # will be ( tool_id, tool_name, tool_version ). missing_tool_tups = [] - supplied_steps = data[ 'steps' ] - # Try to iterate through imported workflow in such a way as to - # preserve step order. - step_indices = supplied_steps.keys() - try: - step_indices = sorted( step_indices, key=int ) - except ValueError: - # to defensive, were these ever or will they ever not be integers? - pass - # First pass to build step objects and populate basic values - for step_index in step_indices: - step_dict = supplied_steps[ step_index ] - + for step_dict in self.__walk_step_dicts( data ): module, step = self.__module_from_dict( trans, step_dict, secure=False ) steps.append( step ) steps_by_external_id[ step_dict['id' ] ] = step @@ -242,7 +230,7 @@ raise MissingToolsException(workflow, errors) # First pass to build step objects and populate basic values - for key, step_dict in data['steps'].iteritems(): + for step_dict in self.__walk_step_dicts( data ): module, step = self.__module_from_dict( trans, step_dict, secure=from_editor ) # Create the model class for the step steps.append( step ) @@ -554,6 +542,24 @@ item['steps'] = steps return item + def __walk_step_dicts( self, data ): + """ Walk over the supplid step dictionaries and return them in a way designed + to preserve step order when possible. + """ + supplied_steps = data[ 'steps' ] + # Try to iterate through imported workflow in such a way as to + # preserve step order. + step_indices = supplied_steps.keys() + try: + step_indices = sorted( step_indices, key=int ) + except ValueError: + # to defensive, were these ever or will they ever not be integers? + pass + # First pass to build step objects and populate basic values + for step_index in step_indices: + step_dict = supplied_steps[ step_index ] + yield step_dict + def __module_from_dict( self, trans, step_dict, secure ): """ Create a WorkflowStep model object and corrsponding module representing type-specific functionality from the incoming dicitionary. https://bitbucket.org/galaxy/galaxy-central/commits/10696df64a3a/ Changeset: 10696df64a3a User: jmchilton Date: 2014-12-16 03:20:12+00:00 Summary: Augment workflow step model for improved tracking. Give every step a UUID that can be preserved across edit to the workflow. Likewise - allow every step to be given a label (a unique short name for that workflow) - that allows for a human consumable way to reference steps for use in tests and when driving workflows via the API. Workflow editor doesn't yet (and might never) display these attributes but it does preserve them across workflow saves. Implement automated testing that uploading workflows, updating workflows, and exporting them handle UUID and labels. Manually tested workflow editor preserves labels and UUID across changes. Affected #: 6 files diff -r f9595a9840c172a85fd8a5335688afb924afaa97 -r 10696df64a3a439ba87f0e3b6089ddd924c37f45 client/galaxy/scripts/galaxy.workflow_editor.canvas.js --- a/client/galaxy/scripts/galaxy.workflow_editor.canvas.js +++ b/client/galaxy/scripts/galaxy.workflow_editor.canvas.js @@ -784,6 +784,8 @@ this.tooltip = data.tooltip ? data.tooltip : ""; this.annotation = data.annotation; this.post_job_actions = data.post_job_actions ? data.post_job_actions : {}; + this.label = data.label; + this.uuid = data.uuid; this.workflow_outputs = data.workflow_outputs ? data.workflow_outputs : []; var node = this; @@ -991,6 +993,8 @@ position : $(node.element).position(), annotation: node.annotation, post_job_actions: node.post_job_actions, + uuid: node.uuid, + label: node.label, workflow_outputs: node.workflow_outputs }; nodes[ node.id ] = node_data; diff -r f9595a9840c172a85fd8a5335688afb924afaa97 -r 10696df64a3a439ba87f0e3b6089ddd924c37f45 lib/galaxy/managers/workflows.py --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -312,6 +312,8 @@ 'annotation': annotation_str, 'input_connections': {}, 'post_job_actions': {}, + 'uuid': str(step.uuid), + 'label': step.label or None, 'workflow_outputs': [] } # Position @@ -344,6 +346,8 @@ 'form_html': module.get_config_form(), 'annotation': annotation_str, 'post_job_actions': {}, + 'uuid': str(step.uuid), + 'label': step.label or None, 'workflow_outputs': [] } # Connections @@ -438,6 +442,8 @@ 'name': module.get_name(), 'tool_state': module.get_state( secure=False ), 'tool_errors': module.get_errors(), + 'uuid': str(step.uuid), + 'label': step.label or None, ## 'data_inputs': module.get_data_inputs(), ## 'data_outputs': module.get_data_outputs(), 'annotation': annotation_str @@ -568,7 +574,10 @@ # TODO: Consider handling position inside module. step.position = step_dict['position'] - + if "uuid" in step_dict: + step.uuid = step_dict["uuid"] + if "label" in step_dict: + step.label = step_dict["label"] module = module_factory.from_dict( trans, step_dict, secure=secure ) module.save_to_step( step ) diff -r f9595a9840c172a85fd8a5335688afb924afaa97 -r 10696df64a3a439ba87f0e3b6089ddd924c37f45 lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -3074,6 +3074,7 @@ self.position = None self.input_connections = [] self.config = None + self.uuid = uuid4() class WorkflowStepConnection( object ): diff -r f9595a9840c172a85fd8a5335688afb924afaa97 -r 10696df64a3a439ba87f0e3b6089ddd924c37f45 lib/galaxy/model/mapping.py --- a/lib/galaxy/model/mapping.py +++ b/lib/galaxy/model/mapping.py @@ -715,6 +715,8 @@ Column( "position", JSONType ), Column( "config", JSONType ), Column( "order_index", Integer ), + Column( "uuid", UUIDType ), + Column( "label", Unicode(255) ), ## Column( "input_connections", JSONType ) ) diff -r f9595a9840c172a85fd8a5335688afb924afaa97 -r 10696df64a3a439ba87f0e3b6089ddd924c37f45 lib/galaxy/model/migrate/versions/0125_workflow_step_tracking.py --- /dev/null +++ b/lib/galaxy/model/migrate/versions/0125_workflow_step_tracking.py @@ -0,0 +1,56 @@ +""" +Migration script to enhance workflow step usability by adding labels and UUIDs. +""" +from sqlalchemy import * +from sqlalchemy.orm import * +from migrate import * +from migrate.changeset import * +from galaxy.model.custom_types import * + +import datetime +now = datetime.datetime.utcnow + +import logging +log = logging.getLogger( __name__ ) + +metadata = MetaData() + + +def upgrade(migrate_engine): + metadata.bind = migrate_engine + print __doc__ + metadata.reflect() + + StepLabel_column = Column( "label", TrimmedString(255) ) + StepUUID_column = Column( "uuid", UUIDType, nullable=True ) + __add_column( StepLabel_column, "workflow_step", metadata ) + __add_column( StepUUID_column, "workflow_step", metadata ) + + +def downgrade(migrate_engine): + metadata.bind = migrate_engine + metadata.reflect() + + for table in TABLES: + __drop(table) + + __drop_column( "label", "workflow_step", metadata ) + __drop_column( "uuid", "workflow_step", metadata ) + + +def __add_column(column, table_name, metadata, **kwds): + try: + table = Table( table_name, metadata, autoload=True ) + column.create( table, **kwds ) + except Exception as e: + print str(e) + log.exception( "Adding column %s column failed." % column) + + +def __drop_column( column_name, table_name, metadata ): + try: + table = Table( table_name, metadata, autoload=True ) + getattr( table.c, column_name ).drop() + except Exception as e: + print str(e) + log.exception( "Dropping column %s failed." % column_name ) diff -r f9595a9840c172a85fd8a5335688afb924afaa97 -r 10696df64a3a439ba87f0e3b6089ddd924c37f45 test/api/test_workflows.py --- a/test/api/test_workflows.py +++ b/test/api/test_workflows.py @@ -3,6 +3,7 @@ from base import api from json import dumps from collections import namedtuple +from uuid import uuid4 import time @@ -227,9 +228,11 @@ def test_upload_deprecated( self ): self.__test_upload( use_deprecated_route=True ) - def __test_upload( self, use_deprecated_route=False, name="test_import" ): + def __test_upload( self, use_deprecated_route=False, name="test_import", workflow=None ): + if workflow is None: + workflow = self.workflow_populator.load_workflow( name=name ) data = dict( - workflow=dumps( self.workflow_populator.load_workflow( name=name ) ), + workflow=dumps( workflow ), ) if use_deprecated_route: route = "workflows/upload" @@ -242,8 +245,25 @@ def test_update( self ): original_workflow = self.workflow_populator.load_workflow( name="test_import" ) + uuids = {} + labels = {} - upload_response = self.__test_upload( ) + for order_index, step_dict in original_workflow["steps"].iteritems(): + uuid = str(uuid4()) + step_dict["uuid"] = uuid + uuids[order_index] = uuid + label = "label_%s" % order_index + step_dict["label"] = label + labels[order_index] = label + + def check_label_and_uuid(order_index, step_dict): + assert order_index in uuids + assert order_index in labels + + self.assertEquals(uuids[order_index], step_dict["uuid"]) + self.assertEquals(labels[order_index], step_dict["label"]) + + upload_response = self.__test_upload( workflow=original_workflow ) workflow_id = upload_response.json()["id"] def update(workflow_object): @@ -260,20 +280,24 @@ steps = workflow_content["steps"] def tweak_step(step): - assert step['position']['top'] != 1 - assert step['position']['left'] != 1 - step['position'] = {'top': 1, 'left': 1} + order_index, step_dict = step + check_label_and_uuid( order_index, step_dict) + assert step_dict['position']['top'] != 1 + assert step_dict['position']['left'] != 1 + step_dict['position'] = {'top': 1, 'left': 1} - map(tweak_step, steps.values()) + map(tweak_step, steps.iteritems()) update(workflow_content) def check_step(step): - assert step['position']['top'] == 1 - assert step['position']['left'] == 1 + order_index, step_dict = step + check_label_and_uuid(order_index, step_dict) + assert step_dict['position']['top'] == 1 + assert step_dict['position']['left'] == 1 updated_workflow_content = self._download_workflow(workflow_id) - map(check_step, updated_workflow_content['steps'].values()) + map(check_step, updated_workflow_content['steps'].iteritems()) # Re-update against original worklfow... update(original_workflow) @@ -281,7 +305,7 @@ updated_workflow_content = self._download_workflow(workflow_id) # Make sure the positions have been updated. - map(tweak_step, updated_workflow_content['steps'].values()) + map(tweak_step, updated_workflow_content['steps'].iteritems()) def test_import_deprecated( self ): workflow_id = self.workflow_populator.simple_workflow( "test_import_published_deprecated", publish=True ) @@ -364,6 +388,8 @@ 'annotation', 'post_job_actions', 'workflow_outputs', + 'uuid', + 'label', ) def test_import_export_with_runtime_inputs( self ): https://bitbucket.org/galaxy/galaxy-central/commits/b6f28aec47bd/ Changeset: b6f28aec47bd User: jmchilton Date: 2014-12-16 03:20:12+00:00 Summary: Pack scripts (for workflow step tracking changes). Affected #: 1 file diff -r 10696df64a3a439ba87f0e3b6089ddd924c37f45 -r b6f28aec47bd4fd6294a5aa5d1ea19fafcdb4f7e static/scripts/galaxy.workflow_editor.canvas.js --- a/static/scripts/galaxy.workflow_editor.canvas.js +++ b/static/scripts/galaxy.workflow_editor.canvas.js @@ -784,6 +784,8 @@ this.tooltip = data.tooltip ? data.tooltip : ""; this.annotation = data.annotation; this.post_job_actions = data.post_job_actions ? data.post_job_actions : {}; + this.label = data.label; + this.uuid = data.uuid; this.workflow_outputs = data.workflow_outputs ? data.workflow_outputs : []; var node = this; @@ -991,6 +993,8 @@ position : $(node.element).position(), annotation: node.annotation, post_job_actions: node.post_job_actions, + uuid: node.uuid, + label: node.label, workflow_outputs: node.workflow_outputs }; nodes[ node.id ] = node_data; https://bitbucket.org/galaxy/galaxy-central/commits/2543786e0614/ Changeset: 2543786e0614 User: jmchilton Date: 2014-12-16 03:20:12+00:00 Summary: Ensure unique step label and UUIDs across workflows during create/update. Affected #: 5 files diff -r b6f28aec47bd4fd6294a5aa5d1ea19fafcdb4f7e -r 2543786e06148d5ceabc603553828e4040882c7b lib/galaxy/exceptions/__init__.py --- a/lib/galaxy/exceptions/__init__.py +++ b/lib/galaxy/exceptions/__init__.py @@ -51,6 +51,11 @@ err_code = error_codes.USER_SLUG_DUPLICATE +class DuplicatedIdentifierException( MessageException ): + status_code = 400 + err_code = error_codes.USER_IDENTIFIER_DUPLICATE + + class ObjectAttributeInvalidException( MessageException ): status_code = 400 err_code = error_codes.USER_OBJECT_ATTRIBUTE_INVALID diff -r b6f28aec47bd4fd6294a5aa5d1ea19fafcdb4f7e -r 2543786e06148d5ceabc603553828e4040882c7b lib/galaxy/exceptions/error_codes.json --- a/lib/galaxy/exceptions/error_codes.json +++ b/lib/galaxy/exceptions/error_codes.json @@ -55,6 +55,11 @@ "message": "The request contains unknown type of contents." }, { + "name": "USER_IDENTIFIER_DUPLICATE", + "code": 400011, + "message": "Request contained a duplicated identifier that must be unique." + }, + { "name": "USER_TOOL_META_PARAMETER_PROBLEM", "code": 400011, "message": "Supplied incorrect or incompatible tool meta parameters." diff -r b6f28aec47bd4fd6294a5aa5d1ea19fafcdb4f7e -r 2543786e06148d5ceabc603553828e4040882c7b lib/galaxy/managers/workflows.py --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -561,9 +561,24 @@ except ValueError: # to defensive, were these ever or will they ever not be integers? pass + + discovered_labels = set() + discovered_uuids = set() + # First pass to build step objects and populate basic values for step_index in step_indices: step_dict = supplied_steps[ step_index ] + uuid = step_dict.get("uuid", None) + if uuid: + if uuid in discovered_uuids: + raise exceptions.DuplicatedIdentifierException("Duplicate step UUID in request.") + discovered_uuids.add(uuid) + label = step_dict.get("label", None) + if label: + if label in discovered_labels: + raise exceptions.DuplicatedIdentifierException("Duplicated step label in request.") + discovered_labels.add(label) + yield step_dict def __module_from_dict( self, trans, step_dict, secure ): diff -r b6f28aec47bd4fd6294a5aa5d1ea19fafcdb4f7e -r 2543786e06148d5ceabc603553828e4040882c7b test/api/helpers.py --- a/test/api/helpers.py +++ b/test/api/helpers.py @@ -198,13 +198,17 @@ return self.create_workflow( workflow, **create_kwds ) def create_workflow( self, workflow, **create_kwds ): + upload_response = self.create_workflow_response( workflow, **create_kwds ) + uploaded_workflow_id = upload_response.json()[ "id" ] + return uploaded_workflow_id + + def create_workflow_response( self, workflow, **create_kwds ): data = dict( workflow=json.dumps( workflow ), **create_kwds ) upload_response = self.galaxy_interactor.post( "workflows/upload", data=data ) - uploaded_workflow_id = upload_response.json()[ "id" ] - return uploaded_workflow_id + return upload_response class LibraryPopulator( object ): diff -r b6f28aec47bd4fd6294a5aa5d1ea19fafcdb4f7e -r 2543786e06148d5ceabc603553828e4040882c7b test/api/test_workflows.py --- a/test/api/test_workflows.py +++ b/test/api/test_workflows.py @@ -307,6 +307,21 @@ # Make sure the positions have been updated. map(tweak_step, updated_workflow_content['steps'].iteritems()) + def test_require_unique_step_uuids( self ): + workflow_dup_uuids = self.workflow_populator.load_workflow( name="test_import" ) + uuid0 = str(uuid4()) + for step_dict in workflow_dup_uuids["steps"].values(): + step_dict["uuid"] = uuid0 + response = self.workflow_populator.create_workflow_response( workflow_dup_uuids ) + self._assert_status_code_is( response, 400 ) + + def test_require_unique_step_labels( self ): + workflow_dup_label = self.workflow_populator.load_workflow( name="test_import" ) + for step_dict in workflow_dup_label["steps"].values(): + step_dict["label"] = "my duplicated label" + response = self.workflow_populator.create_workflow_response( workflow_dup_label ) + self._assert_status_code_is( response, 400 ) + def test_import_deprecated( self ): workflow_id = self.workflow_populator.simple_workflow( "test_import_published_deprecated", publish=True ) with self._different_user(): https://bitbucket.org/galaxy/galaxy-central/commits/52f58efee2dc/ Changeset: 52f58efee2dc User: jmchilton Date: 2014-12-16 03:20:12+00:00 Summary: Allow specifing workflow parameter replacements by step UUID. Affected #: 3 files diff -r 2543786e06148d5ceabc603553828e4040882c7b -r 52f58efee2dce225f5aeac7a382896eaad80baa8 lib/galaxy/workflow/run_request.py --- a/lib/galaxy/workflow/run_request.py +++ b/lib/galaxy/workflow/run_request.py @@ -112,6 +112,10 @@ """ param_dict = param_map.get(step.tool_id, {}).copy() param_dict.update(param_map.get(str(step.id), {})) + step_uuid = step.uuid + if step_uuid: + uuid_params = param_map.get(str(step_uuid), {}) + param_dict.update(uuid_params) if param_dict: if 'param' in param_dict and 'value' in param_dict: param_dict[param_dict['param']] = param_dict['value'] diff -r 2543786e06148d5ceabc603553828e4040882c7b -r 52f58efee2dce225f5aeac7a382896eaad80baa8 test/api/test_workflow_2.ga --- a/test/api/test_workflow_2.ga +++ b/test/api/test_workflow_2.ga @@ -25,6 +25,7 @@ "tool_state": "{\"name\": \"Input Dataset\"}", "tool_version": null, "type": "data_input", + "uuid": "58dffcc9-bcb7-4117-a0e1-61513524b3b0", "user_outputs": [] }, "1": { @@ -54,6 +55,7 @@ "tool_state": "{\"__page__\": 0, \"num_lines\": \"\\\"8\\\"\", \"seed_source\": \"{\\\"__current_case__\\\": 0, \\\"seed_source_selector\\\": \\\"no_seed\\\"}\", \"input\": \"null\", \"chromInfo\": \"\\\"/home/john/workspace/galaxy-central-workflows-params/tool-data/shared/ucsc/chrom/?.len\\\"\", \"__rerun_remap_job_id__\": null}", "tool_version": null, "type": "tool", + "uuid": "58dffcc9-bcb7-4117-a0e1-61513524b3b1", "user_outputs": [] }, "2": { @@ -83,6 +85,7 @@ "tool_state": "{\"__page__\": 0, \"num_lines\": \"\\\"6\\\"\", \"seed_source\": \"{\\\"__current_case__\\\": 0, \\\"seed_source_selector\\\": \\\"no_seed\\\"}\", \"input\": \"null\", \"chromInfo\": \"\\\"/home/john/workspace/galaxy-central-workflows-params/tool-data/shared/ucsc/chrom/?.len\\\"\", \"__rerun_remap_job_id__\": null}", "tool_version": null, "type": "tool", + "uuid": "58dffcc9-bcb7-4117-a0e1-61513524b3b2", "user_outputs": [] } } diff -r 2543786e06148d5ceabc603553828e4040882c7b -r 52f58efee2dce225f5aeac7a382896eaad80baa8 test/api/test_workflows.py --- a/test/api/test_workflows.py +++ b/test/api/test_workflows.py @@ -714,6 +714,20 @@ self.__assert_lines_hid_line_count_is( history_id, 2, 5 ) self.__assert_lines_hid_line_count_is( history_id, 3, 5 ) + @skip_without_tool( "random_lines1" ) + def test_run_replace_params_by_uuid( self ): + workflow_request, history_id = self._setup_random_x2_workflow( "test_for_replace_tool_params" ) + workflow_request[ "parameters" ] = dumps( { + "58dffcc9-bcb7-4117-a0e1-61513524b3b1": dict( num_lines=4 ), + "58dffcc9-bcb7-4117-a0e1-61513524b3b2": dict( num_lines=3 ), + } ) + run_workflow_response = self._post( "workflows", data=workflow_request ) + self._assert_status_code_is( run_workflow_response, 200 ) + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + # Would be 8 and 6 without modification + self.__assert_lines_hid_line_count_is( history_id, 2, 4 ) + self.__assert_lines_hid_line_count_is( history_id, 3, 3 ) + @skip_without_tool( "validation_default" ) def test_parameter_substitution_validation( self ): substitions = dict( input1="\" ; echo \"moo" ) https://bitbucket.org/galaxy/galaxy-central/commits/7ee34c597572/ Changeset: 7ee34c597572 User: jmchilton Date: 2014-12-16 03:20:12+00:00 Summary: Allow specifing workflow inputs by step UUID. Pass in the parameter 'inputs_by' as 'step_uuid' to the workflow run command to use this. Specifing inputs by UUID has the nice advantage that it survives workflows saves - so if one sets up an API script or something to target a workflow - saving the workflow in the editor doesn't need to break the script as long as inputs were not added or deleted. The UUID (like the order_index) has the advantage of the step id that it is predeterminable - so one can set it up a workflow script against any Galaxy and the script doesn't need to be adapted to raw ids the steps get assigned in that instance. Affected #: 2 files diff -r 52f58efee2dce225f5aeac7a382896eaad80baa8 -r 7ee34c5975728c0f508a4283ef2f1ee35672ab0e lib/galaxy/workflow/run_request.py --- a/lib/galaxy/workflow/run_request.py +++ b/lib/galaxy/workflow/run_request.py @@ -57,6 +57,8 @@ inputs_key = str( step.id ) elif inputs_by == "step_index": inputs_key = str( step.order_index ) + elif inputs_by == "step_uuid": + inputs_key = str( step.uuid ) elif inputs_by == "name": inputs_key = step.tool_inputs.get( 'name', None ) else: @@ -170,6 +172,11 @@ # of Galaxy at the time of workflow import. inputs_by = inputs_by or 'step_index' + # TODO: with a little reworking this default could be + # step_index or step_uuid depending on what is inputs - + # unlike step_id and step_index there is not chance of + # conflicts. + add_to_history = 'no_add_to_history' not in payload history_param = payload.get('history', '') diff -r 52f58efee2dce225f5aeac7a382896eaad80baa8 -r 7ee34c5975728c0f508a4283ef2f1ee35672ab0e test/api/test_workflows.py --- a/test/api/test_workflows.py +++ b/test/api/test_workflows.py @@ -78,6 +78,13 @@ elif inputs_by == "name": workflow_request[ "inputs" ] = dumps( label_map ) workflow_request[ "inputs_by" ] = 'name' + elif inputs_by == "step_uuid": + uuid_map = { + workflow["steps"]["0"]["uuid"]: self._ds_entry(hda1), + workflow["steps"]["1"]["uuid"]: self._ds_entry(hda2), + } + workflow_request[ "inputs" ] = dumps( uuid_map ) + workflow_request[ "inputs_by" ] = "step_uuid" return workflow_request, history_id @@ -421,6 +428,10 @@ self.__run_cat_workflow( inputs_by='step_index' ) @skip_without_tool( "cat1" ) + def test_run_workflow_by_uuid( self ): + self.__run_cat_workflow( inputs_by='step_uuid' ) + + @skip_without_tool( "cat1" ) def test_run_workflow_by_name( self ): self.__run_cat_workflow( inputs_by='name' ) @@ -430,11 +441,13 @@ def __run_cat_workflow( self, inputs_by ): workflow = self.workflow_populator.load_workflow( name="test_for_run" ) + workflow["steps"]["0"]["uuid"] = str(uuid4()) + workflow["steps"]["1"]["uuid"] = str(uuid4()) workflow_request, history_id = self._setup_workflow_run( workflow, inputs_by=inputs_by ) # TODO: This should really be a post to workflows/<workflow_id>/run or # something like that. run_workflow_response = self._post( "workflows", data=workflow_request ) - + self._assert_status_code_is( run_workflow_response, 200 ) invocation_id = run_workflow_response.json()[ "id" ] invocation = self._invocation_details( workflow_request[ "workflow_id" ], invocation_id ) assert invocation[ "state" ] == "scheduled", invocation https://bitbucket.org/galaxy/galaxy-central/commits/8d5089f2ba81/ Changeset: 8d5089f2ba81 User: jmchilton Date: 2014-12-16 03:20:12+00:00 Summary: Allow workflow inputs to be specifiable by UUID without explicit inputs_by. Since there is no chance of conflict with step.id or step.order_index, just allow inputs or ds_map respectively to be indexed by UUID. Affected #: 2 files diff -r 7ee34c5975728c0f508a4283ef2f1ee35672ab0e -r 8d5089f2ba8183439a866026f3583c377f8bfa74 lib/galaxy/workflow/run_request.py --- a/lib/galaxy/workflow/run_request.py +++ b/lib/galaxy/workflow/run_request.py @@ -53,18 +53,25 @@ if step.type not in INPUT_STEP_TYPES: continue - if inputs_by == "step_id": - inputs_key = str( step.id ) - elif inputs_by == "step_index": - inputs_key = str( step.order_index ) - elif inputs_by == "step_uuid": - inputs_key = str( step.uuid ) - elif inputs_by == "name": - inputs_key = step.tool_inputs.get( 'name', None ) - else: - message = "Workflow cannot be run because unexpected inputs_by value specified." - raise exceptions.MessageException( message ) - if inputs_key not in inputs: + possible_input_keys = [] + for inputs_by_el in inputs_by.split("|"): + if inputs_by_el == "step_id": + possible_input_keys = [str( step.id )] + elif inputs_by_el == "step_index": + possible_input_keys = [str( step.order_index )] + elif inputs_by_el == "step_uuid": + possible_input_keys = [str( step.uuid )] + elif inputs_by_el == "name": + possible_input_keys = [step.tool_inputs.get( 'name', None )] + else: + message = "Workflow cannot be run because unexpected inputs_by value specified." + raise exceptions.MessageException( message ) + inputs_key = None + for possible_input_key in possible_input_keys: + if possible_input_key in inputs: + inputs_key = possible_input_key + + if not inputs_key: message = "Workflow cannot be run because an expected input step '%s' has no input dataset." % step.id raise exceptions.MessageException( message ) @@ -164,18 +171,13 @@ # Default to legacy behavior - read ds_map and reference steps # by unencoded step id (a raw database id). inputs = payload.get( 'ds_map', {} ) - inputs_by = inputs_by or 'step_id' + inputs_by = inputs_by or 'step_id|step_uuid' else: inputs = inputs or {} # New default is to reference steps by index of workflow step # which is intrinsic to the workflow and independent of the state # of Galaxy at the time of workflow import. - inputs_by = inputs_by or 'step_index' - - # TODO: with a little reworking this default could be - # step_index or step_uuid depending on what is inputs - - # unlike step_id and step_index there is not chance of - # conflicts. + inputs_by = inputs_by or 'step_index|step_uuid' add_to_history = 'no_add_to_history' not in payload history_param = payload.get('history', '') diff -r 7ee34c5975728c0f508a4283ef2f1ee35672ab0e -r 8d5089f2ba8183439a866026f3583c377f8bfa74 test/api/test_workflows.py --- a/test/api/test_workflows.py +++ b/test/api/test_workflows.py @@ -78,13 +78,14 @@ elif inputs_by == "name": workflow_request[ "inputs" ] = dumps( label_map ) workflow_request[ "inputs_by" ] = 'name' - elif inputs_by == "step_uuid": + elif inputs_by in [ "step_uuid", "uuid_implicitly" ]: uuid_map = { workflow["steps"]["0"]["uuid"]: self._ds_entry(hda1), workflow["steps"]["1"]["uuid"]: self._ds_entry(hda2), } workflow_request[ "inputs" ] = dumps( uuid_map ) - workflow_request[ "inputs_by" ] = "step_uuid" + if inputs_by == "step_uuid": + workflow_request[ "inputs_by" ] = "step_uuid" return workflow_request, history_id @@ -432,6 +433,10 @@ self.__run_cat_workflow( inputs_by='step_uuid' ) @skip_without_tool( "cat1" ) + def test_run_workflow_by_uuid_implicitly( self ): + self.__run_cat_workflow( inputs_by='uuid_implicitly' ) + + @skip_without_tool( "cat1" ) def test_run_workflow_by_name( self ): self.__run_cat_workflow( inputs_by='name' ) Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.