4 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/2b1b1b0ac9f8/ Changeset: 2b1b1b0ac9f8 User: jmchilton Date: 2014-02-05 22:20:31 Summary: Create replacement dict creation in worfklow controllers once. Both recalculated this per step, brings the actual run code closer together - which will make subsequent changeset diff cleaner. Affected #: 2 files diff -r 99beec4c3a338249ab661a7edc3e30ee115486e9 -r 2b1b1b0ac9f8a2f6a452bb41807baac9d3c827ca lib/galaxy/webapps/galaxy/api/workflows.py --- a/lib/galaxy/webapps/galaxy/api/workflows.py +++ b/lib/galaxy/webapps/galaxy/api/workflows.py @@ -267,6 +267,9 @@ outputs = util.odict.odict() rval['history'] = trans.security.encode_id(history.id) rval['outputs'] = [] + + replacement_dict = payload.get('replacement_params', {}) + for step in workflow.steps: job = None if step.type == 'tool' or step.type is None: @@ -281,11 +284,9 @@ job, out_data = tool.execute( trans, step.state.inputs, history=history) outputs[ step.id ] = out_data - # Do post-job actions. - replacement_params = payload.get('replacement_params', {}) for pja in step.post_job_actions: if pja.action_type in ActionBox.immediate_actions: - ActionBox.execute(trans.app, trans.sa_session, pja, job, replacement_dict=replacement_params) + ActionBox.execute(trans.app, trans.sa_session, pja, job, replacement_dict=replacement_dict) else: job.add_post_job_action(pja) diff -r 99beec4c3a338249ab661a7edc3e30ee115486e9 -r 2b1b1b0ac9f8a2f6a452bb41807baac9d3c827ca lib/galaxy/webapps/galaxy/controllers/workflow.py --- a/lib/galaxy/webapps/galaxy/controllers/workflow.py +++ b/lib/galaxy/webapps/galaxy/controllers/workflow.py @@ -1373,6 +1373,13 @@ workflow_invocation = model.WorkflowInvocation() workflow_invocation.workflow = workflow outputs = odict() + + # Build replacement dict for this workflow execution. + replacement_dict = {} + for k, v in kwargs.iteritems(): + if k.startswith('wf_parm|'): + replacement_dict[k[8:]] = v + for i, step in enumerate( workflow.steps ): # Execute module job = None @@ -1401,10 +1408,6 @@ # Create new PJA associations with the created job, to be run on completion. # PJA Parameter Replacement (only applies to immediate actions-- rename specifically, for now) # Pass along replacement dict with the execution of the PJA so we don't have to modify the object. - replacement_dict = {} - for k, v in kwargs.iteritems(): - if k.startswith('wf_parm|'): - replacement_dict[k[8:]] = v for pja in step.post_job_actions: if pja.action_type in ActionBox.immediate_actions: ActionBox.execute(trans.app, trans.sa_session, pja, job, replacement_dict) https://bitbucket.org/galaxy/galaxy-central/commits/b8f7b6c1daf3/ Changeset: b8f7b6c1daf3 User: jmchilton Date: 2014-02-05 22:20:31 Summary: Refactor out common code for running workflows. This probably results in some bug fixes for the API workflow running - such as running tools with multiple=true data input parameters. Affected #: 3 files diff -r 2b1b1b0ac9f8a2f6a452bb41807baac9d3c827ca -r b8f7b6c1daf3834b71dfc1fb73b3d8004b924211 lib/galaxy/webapps/galaxy/api/workflows.py --- a/lib/galaxy/webapps/galaxy/api/workflows.py +++ b/lib/galaxy/webapps/galaxy/api/workflows.py @@ -9,11 +9,10 @@ from galaxy import exceptions from galaxy import util from galaxy import web -from galaxy.tools.parameters import visit_input_values, DataToolParameter from galaxy.web import _future_expose_api as expose_api from galaxy.web.base.controller import BaseAPIController, url_for, UsesStoredWorkflowMixin from galaxy.workflow.modules import module_factory -from galaxy.jobs.actions.post import ActionBox +from galaxy.workflow.run import invoke log = logging.getLogger(__name__) @@ -237,6 +236,14 @@ rval = {} for step in workflow.steps: step_errors = None + input_connections_by_name = {} + for conn in step.input_connections: + input_name = conn.input_name + if not input_name in input_connections_by_name: + input_connections_by_name[input_name] = [] + input_connections_by_name[input_name].append(conn) + step.input_connections_by_name = input_connections_by_name + if step.type == 'tool' or step.type is None: step.module = module_factory.from_workflow_step( trans, step ) # Check for missing parameters @@ -259,50 +266,30 @@ return "Workflow cannot be run because an expected input step '%s' has no input dataset." % step.id step.module = module_factory.from_workflow_step( trans, step ) step.state = step.module.get_runtime_state() - step.input_connections_by_name = dict( ( conn.input_name, conn ) for conn in step.input_connections ) # Run each step, connecting outputs to inputs - workflow_invocation = self.app.model.WorkflowInvocation() - workflow_invocation.workflow = workflow outputs = util.odict.odict() rval['history'] = trans.security.encode_id(history.id) rval['outputs'] = [] replacement_dict = payload.get('replacement_params', {}) + outputs = invoke( + trans=trans, + workflow=workflow, + target_history=history, + replacement_dict=replacement_dict, + ds_map=ds_map, + ) + trans.sa_session.flush() + + # Build legacy output - should probably include more information from + # outputs. for step in workflow.steps: - job = None if step.type == 'tool' or step.type is None: - tool = self.app.toolbox.get_tool( step.tool_id ) + for v in outputs[ step.id ].itervalues(): + rval[ 'outputs' ].append( trans.security.encode_id( v.id ) ) - def callback( input, value, prefixed_name, prefixed_label ): - if isinstance( input, DataToolParameter ): - if prefixed_name in step.input_connections_by_name: - conn = step.input_connections_by_name[ prefixed_name ] - return outputs[ conn.output_step.id ][ conn.output_name ] - visit_input_values( tool.inputs, step.state.inputs, callback ) - job, out_data = tool.execute( trans, step.state.inputs, history=history) - outputs[ step.id ] = out_data - - for pja in step.post_job_actions: - if pja.action_type in ActionBox.immediate_actions: - ActionBox.execute(trans.app, trans.sa_session, pja, job, replacement_dict=replacement_dict) - else: - job.add_post_job_action(pja) - - for v in out_data.itervalues(): - rval['outputs'].append(trans.security.encode_id(v.id)) - else: - #This is an input step. Use the dataset inputs from ds_map. - job, out_data = step.module.execute( trans, step.state) - outputs[step.id] = out_data - outputs[step.id]['output'] = ds_map[str(step.id)]['hda'] - workflow_invocation_step = self.app.model.WorkflowInvocationStep() - workflow_invocation_step.workflow_invocation = workflow_invocation - workflow_invocation_step.workflow_step = step - workflow_invocation_step.job = job - trans.sa_session.add( workflow_invocation ) - trans.sa_session.flush() return rval @web.expose_api diff -r 2b1b1b0ac9f8a2f6a452bb41807baac9d3c827ca -r b8f7b6c1daf3834b71dfc1fb73b3d8004b924211 lib/galaxy/webapps/galaxy/controllers/workflow.py --- a/lib/galaxy/webapps/galaxy/controllers/workflow.py +++ b/lib/galaxy/webapps/galaxy/controllers/workflow.py @@ -18,7 +18,6 @@ from galaxy import util from galaxy import web from galaxy.datatypes.data import Data -from galaxy.jobs.actions.post import ActionBox from galaxy.model.item_attrs import UsesItemRatings from galaxy.model.mapping import desc from galaxy.tools.parameters import visit_input_values @@ -33,6 +32,7 @@ from galaxy.web.framework.helpers import grids, time_ago from galaxy.web.framework.helpers import to_unicode from galaxy.workflow.modules import module_factory +from galaxy.workflow.run import invoke class StoredWorkflowListGrid( grids.Grid ): @@ -1369,10 +1369,6 @@ target_history = new_history else: target_history = trans.get_history() - # Run each step, connecting outputs to inputs - workflow_invocation = model.WorkflowInvocation() - workflow_invocation.workflow = workflow - outputs = odict() # Build replacement dict for this workflow execution. replacement_dict = {} @@ -1380,54 +1376,14 @@ if k.startswith('wf_parm|'): replacement_dict[k[8:]] = v - for i, step in enumerate( workflow.steps ): - # Execute module - job = None - if step.type == 'tool' or step.type is None: - tool = trans.app.toolbox.get_tool( step.tool_id ) + outputs = invoke( + trans=trans, + workflow=workflow, + target_history=target_history, + replacement_dict=replacement_dict, + copy_inputs_to_history=new_history is not None + ) - # Connect up - def callback( input, value, prefixed_name, prefixed_label ): - replacement = None - if isinstance( input, DataToolParameter ): - if prefixed_name in step.input_connections_by_name: - conn = step.input_connections_by_name[ prefixed_name ] - if input.multiple: - replacement = [outputs[ c.output_step.id ][ c.output_name ] for c in conn] - else: - replacement = outputs[ conn[0].output_step.id ][ conn[0].output_name ] - return replacement - try: - # Replace DummyDatasets with historydatasetassociations - visit_input_values( tool.inputs, step.state.inputs, callback ) - except KeyError, k: - error( "Error due to input mapping of '%s' in '%s'. A common cause of this is conditional outputs that cannot be determined until runtime, please review your workflow." % (tool.name, k.message)) - # Execute it - job, out_data = tool.execute( trans, step.state.inputs, history=target_history) - outputs[ step.id ] = out_data - # Create new PJA associations with the created job, to be run on completion. - # PJA Parameter Replacement (only applies to immediate actions-- rename specifically, for now) - # Pass along replacement dict with the execution of the PJA so we don't have to modify the object. - for pja in step.post_job_actions: - if pja.action_type in ActionBox.immediate_actions: - ActionBox.execute(trans.app, trans.sa_session, pja, job, replacement_dict) - else: - job.add_post_job_action(pja) - else: - job, out_data = step.module.execute( trans, step.state ) - outputs[ step.id ] = out_data - if new_history: - for input_dataset_hda in out_data.values(): - new_hda = input_dataset_hda.copy( copy_children=True ) - new_history.add_dataset(new_hda) - outputs[ step.id ]['input_ds_copy'] = new_hda - # Record invocation - workflow_invocation_step = model.WorkflowInvocationStep() - workflow_invocation_step.workflow_invocation = workflow_invocation - workflow_invocation_step.workflow_step = step - workflow_invocation_step.job = job - # All jobs ran successfully, so we can save now - trans.sa_session.add( workflow_invocation ) invocations.append({'outputs': outputs, 'new_history': new_history}) trans.sa_session.flush() diff -r 2b1b1b0ac9f8a2f6a452bb41807baac9d3c827ca -r b8f7b6c1daf3834b71dfc1fb73b3d8004b924211 lib/galaxy/workflow/run.py --- /dev/null +++ b/lib/galaxy/workflow/run.py @@ -0,0 +1,108 @@ +from galaxy import model +from galaxy import exceptions + +from galaxy.jobs.actions.post import ActionBox + +from galaxy.tools.parameters.basic import DataToolParameter +from galaxy.tools.parameters import visit_input_values +from galaxy.util.odict import odict + + +def invoke( trans, workflow, target_history, replacement_dict, copy_inputs_to_history=False, ds_map={} ): + """ Run the supplied workflow in the supplied target_history. + """ + return WorkflowInvoker( + trans, + workflow, + target_history, + replacement_dict, + copy_inputs_to_history=copy_inputs_to_history, + ds_map=ds_map, + ).invoke() + + +class WorkflowInvoker( object ): + + def __init__( self, trans, workflow, target_history, replacement_dict, copy_inputs_to_history, ds_map ): + self.trans = trans + self.workflow = workflow + self.target_history = target_history + self.replacement_dict = replacement_dict + self.copy_inputs_to_history = copy_inputs_to_history + self.ds_map = ds_map + + self.outputs = odict() + + def invoke( self ): + workflow_invocation = model.WorkflowInvocation() + workflow_invocation.workflow = self.workflow + + for step in self.workflow.steps: + job = None + job = self._invoke_step( step ) + # Record invocation + workflow_invocation_step = model.WorkflowInvocationStep() + workflow_invocation_step.workflow_invocation = workflow_invocation + workflow_invocation_step.workflow_step = step + workflow_invocation_step.job = job + + # All jobs ran successfully, so we can save now + self.trans.sa_session.add( workflow_invocation ) + + # Not flushing in here, because web controller may create multiple + # invokations. + return self.outputs + + def _invoke_step( self, step ): + trans = self.trans + outputs = self.outputs + target_history = self.target_history + replacement_dict = self.replacement_dict + + if step.type == 'tool' or step.type is None: + tool = trans.app.toolbox.get_tool( step.tool_id ) + + # Connect up + def callback( input, value, prefixed_name, prefixed_label ): + replacement = None + if isinstance( input, DataToolParameter ): + if prefixed_name in step.input_connections_by_name: + conn = step.input_connections_by_name[ prefixed_name ] + if input.multiple: + replacement = [outputs[ c.output_step.id ][ c.output_name ] for c in conn] + else: + replacement = outputs[ conn[0].output_step.id ][ conn[0].output_name ] + return replacement + try: + # Replace DummyDatasets with historydatasetassociations + visit_input_values( tool.inputs, step.state.inputs, callback ) + except KeyError, k: + raise exceptions.MessageException( "Error due to input mapping of '%s' in '%s'. A common cause of this is conditional outputs that cannot be determined until runtime, please review your workflow." % (tool.name, k.message)) + # Execute it + job, out_data = tool.execute( trans, step.state.inputs, history=target_history) + outputs[ step.id ] = out_data + # Create new PJA associations with the created job, to be run on completion. + # PJA Parameter Replacement (only applies to immediate actions-- rename specifically, for now) + # Pass along replacement dict with the execution of the PJA so we don't have to modify the object. + for pja in step.post_job_actions: + if pja.action_type in ActionBox.immediate_actions: + ActionBox.execute( trans.app, trans.sa_session, pja, job, replacement_dict ) + else: + job.add_post_job_action(pja) + else: + job, out_data = step.module.execute( trans, step.state ) + outputs[ step.id ] = out_data + + # Web controller may set copy_inputs_to_history, API controller always sets + # ds_map. + if self.copy_inputs_to_history: + for input_dataset_hda in out_data.values(): + new_hda = input_dataset_hda.copy( copy_children=True ) + target_history.add_dataset(new_hda) + outputs[ step.id ]['input_ds_copy'] = new_hda + if self.ds_map: + outputs[step.id]['output'] = self.ds_map[ str( step.id ) ][ 'hda' ] + + return job + +__all__ = [ invoke ] https://bitbucket.org/galaxy/galaxy-central/commits/f263f13f1e6c/ Changeset: f263f13f1e6c User: jmchilton Date: 2014-02-05 22:20:31 Summary: Refactor 2 paths of workflow step invokation into helper functions. Affected #: 1 file diff -r b8f7b6c1daf3834b71dfc1fb73b3d8004b924211 -r f263f13f1e6c52de6f9b2990a9fe60cf1eef714d lib/galaxy/workflow/run.py --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -54,55 +54,66 @@ return self.outputs def _invoke_step( self, step ): - trans = self.trans - outputs = self.outputs - target_history = self.target_history - replacement_dict = self.replacement_dict - if step.type == 'tool' or step.type is None: - tool = trans.app.toolbox.get_tool( step.tool_id ) - - # Connect up - def callback( input, value, prefixed_name, prefixed_label ): - replacement = None - if isinstance( input, DataToolParameter ): - if prefixed_name in step.input_connections_by_name: - conn = step.input_connections_by_name[ prefixed_name ] - if input.multiple: - replacement = [outputs[ c.output_step.id ][ c.output_name ] for c in conn] - else: - replacement = outputs[ conn[0].output_step.id ][ conn[0].output_name ] - return replacement - try: - # Replace DummyDatasets with historydatasetassociations - visit_input_values( tool.inputs, step.state.inputs, callback ) - except KeyError, k: - raise exceptions.MessageException( "Error due to input mapping of '%s' in '%s'. A common cause of this is conditional outputs that cannot be determined until runtime, please review your workflow." % (tool.name, k.message)) - # Execute it - job, out_data = tool.execute( trans, step.state.inputs, history=target_history) - outputs[ step.id ] = out_data - # Create new PJA associations with the created job, to be run on completion. - # PJA Parameter Replacement (only applies to immediate actions-- rename specifically, for now) - # Pass along replacement dict with the execution of the PJA so we don't have to modify the object. - for pja in step.post_job_actions: - if pja.action_type in ActionBox.immediate_actions: - ActionBox.execute( trans.app, trans.sa_session, pja, job, replacement_dict ) - else: - job.add_post_job_action(pja) + job = self._execute_tool_step( step ) else: - job, out_data = step.module.execute( trans, step.state ) - outputs[ step.id ] = out_data - - # Web controller may set copy_inputs_to_history, API controller always sets - # ds_map. - if self.copy_inputs_to_history: - for input_dataset_hda in out_data.values(): - new_hda = input_dataset_hda.copy( copy_children=True ) - target_history.add_dataset(new_hda) - outputs[ step.id ]['input_ds_copy'] = new_hda - if self.ds_map: - outputs[step.id]['output'] = self.ds_map[ str( step.id ) ][ 'hda' ] + job = self._execute_input_step( step ) return job + def _execute_tool_step( self, step ): + trans = self.trans + outputs = self.outputs + replacement_dict = self.replacement_dict + + tool = trans.app.toolbox.get_tool( step.tool_id ) + + # Connect up + def callback( input, value, prefixed_name, prefixed_label ): + replacement = None + if isinstance( input, DataToolParameter ): + if prefixed_name in step.input_connections_by_name: + conn = step.input_connections_by_name[ prefixed_name ] + if input.multiple: + replacement = [outputs[ c.output_step.id ][ c.output_name ] for c in conn] + else: + replacement = outputs[ conn[0].output_step.id ][ conn[0].output_name ] + return replacement + try: + # Replace DummyDatasets with historydatasetassociations + visit_input_values( tool.inputs, step.state.inputs, callback ) + except KeyError, k: + raise exceptions.MessageException( "Error due to input mapping of '%s' in '%s'. A common cause of this is conditional outputs that cannot be determined until runtime, please review your workflow." % (tool.name, k.message)) + # Execute it + job, out_data = tool.execute( trans, step.state.inputs, history=self.target_history) + outputs[ step.id ] = out_data + # Create new PJA associations with the created job, to be run on completion. + # PJA Parameter Replacement (only applies to immediate actions-- rename specifically, for now) + # Pass along replacement dict with the execution of the PJA so we don't have to modify the object. + for pja in step.post_job_actions: + if pja.action_type in ActionBox.immediate_actions: + ActionBox.execute( trans.app, trans.sa_session, pja, job, replacement_dict ) + else: + job.add_post_job_action(pja) + + return job + + def _execute_input_step( self, step ): + trans = self.trans + outputs = self.outputs + + job, out_data = step.module.execute( trans, step.state ) + outputs[ step.id ] = out_data + + # Web controller may set copy_inputs_to_history, API controller always sets + # ds_map. + if self.copy_inputs_to_history: + for input_dataset_hda in out_data.values(): + new_hda = input_dataset_hda.copy( copy_children=True ) + self.target_history.add_dataset(new_hda) + outputs[ step.id ]['input_ds_copy'] = new_hda + if self.ds_map: + outputs[step.id]['output'] = self.ds_map[ str( step.id ) ][ 'hda' ] + return job + __all__ = [ invoke ] https://bitbucket.org/galaxy/galaxy-central/commits/c2fbf6fc1213/ Changeset: c2fbf6fc1213 User: jmchilton Date: 2014-02-06 22:00:51 Summary: Merged in jmchilton/galaxy-central-fork-1 (pull request #320) Refactor common workflow run functionality out of controllers Affected #: 3 files diff -r b2206f4ca10f8cec7c37f43f80fc24f0b93b420b -r c2fbf6fc1213eda63f56611635857e58388e2f0e lib/galaxy/webapps/galaxy/api/workflows.py --- a/lib/galaxy/webapps/galaxy/api/workflows.py +++ b/lib/galaxy/webapps/galaxy/api/workflows.py @@ -9,11 +9,10 @@ from galaxy import exceptions from galaxy import util from galaxy import web -from galaxy.tools.parameters import visit_input_values, DataToolParameter from galaxy.web import _future_expose_api as expose_api from galaxy.web.base.controller import BaseAPIController, url_for, UsesStoredWorkflowMixin from galaxy.workflow.modules import module_factory -from galaxy.jobs.actions.post import ActionBox +from galaxy.workflow.run import invoke log = logging.getLogger(__name__) @@ -237,6 +236,14 @@ rval = {} for step in workflow.steps: step_errors = None + input_connections_by_name = {} + for conn in step.input_connections: + input_name = conn.input_name + if not input_name in input_connections_by_name: + input_connections_by_name[input_name] = [] + input_connections_by_name[input_name].append(conn) + step.input_connections_by_name = input_connections_by_name + if step.type == 'tool' or step.type is None: step.module = module_factory.from_workflow_step( trans, step ) # Check for missing parameters @@ -259,49 +266,30 @@ return "Workflow cannot be run because an expected input step '%s' has no input dataset." % step.id step.module = module_factory.from_workflow_step( trans, step ) step.state = step.module.get_runtime_state() - step.input_connections_by_name = dict( ( conn.input_name, conn ) for conn in step.input_connections ) # Run each step, connecting outputs to inputs - workflow_invocation = self.app.model.WorkflowInvocation() - workflow_invocation.workflow = workflow outputs = util.odict.odict() rval['history'] = trans.security.encode_id(history.id) rval['outputs'] = [] + + replacement_dict = payload.get('replacement_params', {}) + + outputs = invoke( + trans=trans, + workflow=workflow, + target_history=history, + replacement_dict=replacement_dict, + ds_map=ds_map, + ) + trans.sa_session.flush() + + # Build legacy output - should probably include more information from + # outputs. for step in workflow.steps: - job = None if step.type == 'tool' or step.type is None: - tool = self.app.toolbox.get_tool( step.tool_id ) + for v in outputs[ step.id ].itervalues(): + rval[ 'outputs' ].append( trans.security.encode_id( v.id ) ) - def callback( input, value, prefixed_name, prefixed_label ): - if isinstance( input, DataToolParameter ): - if prefixed_name in step.input_connections_by_name: - conn = step.input_connections_by_name[ prefixed_name ] - return outputs[ conn.output_step.id ][ conn.output_name ] - visit_input_values( tool.inputs, step.state.inputs, callback ) - job, out_data = tool.execute( trans, step.state.inputs, history=history) - outputs[ step.id ] = out_data - - # Do post-job actions. - replacement_params = payload.get('replacement_params', {}) - for pja in step.post_job_actions: - if pja.action_type in ActionBox.immediate_actions: - ActionBox.execute(trans.app, trans.sa_session, pja, job, replacement_dict=replacement_params) - else: - job.add_post_job_action(pja) - - for v in out_data.itervalues(): - rval['outputs'].append(trans.security.encode_id(v.id)) - else: - #This is an input step. Use the dataset inputs from ds_map. - job, out_data = step.module.execute( trans, step.state) - outputs[step.id] = out_data - outputs[step.id]['output'] = ds_map[str(step.id)]['hda'] - workflow_invocation_step = self.app.model.WorkflowInvocationStep() - workflow_invocation_step.workflow_invocation = workflow_invocation - workflow_invocation_step.workflow_step = step - workflow_invocation_step.job = job - trans.sa_session.add( workflow_invocation ) - trans.sa_session.flush() return rval @web.expose_api diff -r b2206f4ca10f8cec7c37f43f80fc24f0b93b420b -r c2fbf6fc1213eda63f56611635857e58388e2f0e lib/galaxy/webapps/galaxy/controllers/workflow.py --- a/lib/galaxy/webapps/galaxy/controllers/workflow.py +++ b/lib/galaxy/webapps/galaxy/controllers/workflow.py @@ -18,7 +18,6 @@ from galaxy import util from galaxy import web from galaxy.datatypes.data import Data -from galaxy.jobs.actions.post import ActionBox from galaxy.model.item_attrs import UsesItemRatings from galaxy.model.mapping import desc from galaxy.tools.parameters import visit_input_values @@ -33,6 +32,7 @@ from galaxy.web.framework.helpers import grids, time_ago from galaxy.web.framework.helpers import to_unicode from galaxy.workflow.modules import module_factory +from galaxy.workflow.run import invoke class StoredWorkflowListGrid( grids.Grid ): @@ -1369,62 +1369,21 @@ target_history = new_history else: target_history = trans.get_history() - # Run each step, connecting outputs to inputs - workflow_invocation = model.WorkflowInvocation() - workflow_invocation.workflow = workflow - outputs = odict() - for i, step in enumerate( workflow.steps ): - # Execute module - job = None - if step.type == 'tool' or step.type is None: - tool = trans.app.toolbox.get_tool( step.tool_id ) - # Connect up - def callback( input, value, prefixed_name, prefixed_label ): - replacement = None - if isinstance( input, DataToolParameter ): - if prefixed_name in step.input_connections_by_name: - conn = step.input_connections_by_name[ prefixed_name ] - if input.multiple: - replacement = [outputs[ c.output_step.id ][ c.output_name ] for c in conn] - else: - replacement = outputs[ conn[0].output_step.id ][ conn[0].output_name ] - return replacement - try: - # Replace DummyDatasets with historydatasetassociations - visit_input_values( tool.inputs, step.state.inputs, callback ) - except KeyError, k: - error( "Error due to input mapping of '%s' in '%s'. A common cause of this is conditional outputs that cannot be determined until runtime, please review your workflow." % (tool.name, k.message)) - # Execute it - job, out_data = tool.execute( trans, step.state.inputs, history=target_history) - outputs[ step.id ] = out_data - # Create new PJA associations with the created job, to be run on completion. - # PJA Parameter Replacement (only applies to immediate actions-- rename specifically, for now) - # Pass along replacement dict with the execution of the PJA so we don't have to modify the object. - replacement_dict = {} - for k, v in kwargs.iteritems(): - if k.startswith('wf_parm|'): - replacement_dict[k[8:]] = v - for pja in step.post_job_actions: - if pja.action_type in ActionBox.immediate_actions: - ActionBox.execute(trans.app, trans.sa_session, pja, job, replacement_dict) - else: - job.add_post_job_action(pja) - else: - job, out_data = step.module.execute( trans, step.state ) - outputs[ step.id ] = out_data - if new_history: - for input_dataset_hda in out_data.values(): - new_hda = input_dataset_hda.copy( copy_children=True ) - new_history.add_dataset(new_hda) - outputs[ step.id ]['input_ds_copy'] = new_hda - # Record invocation - workflow_invocation_step = model.WorkflowInvocationStep() - workflow_invocation_step.workflow_invocation = workflow_invocation - workflow_invocation_step.workflow_step = step - workflow_invocation_step.job = job - # All jobs ran successfully, so we can save now - trans.sa_session.add( workflow_invocation ) + # Build replacement dict for this workflow execution. + replacement_dict = {} + for k, v in kwargs.iteritems(): + if k.startswith('wf_parm|'): + replacement_dict[k[8:]] = v + + outputs = invoke( + trans=trans, + workflow=workflow, + target_history=target_history, + replacement_dict=replacement_dict, + copy_inputs_to_history=new_history is not None + ) + invocations.append({'outputs': outputs, 'new_history': new_history}) trans.sa_session.flush() diff -r b2206f4ca10f8cec7c37f43f80fc24f0b93b420b -r c2fbf6fc1213eda63f56611635857e58388e2f0e lib/galaxy/workflow/run.py --- /dev/null +++ b/lib/galaxy/workflow/run.py @@ -0,0 +1,119 @@ +from galaxy import model +from galaxy import exceptions + +from galaxy.jobs.actions.post import ActionBox + +from galaxy.tools.parameters.basic import DataToolParameter +from galaxy.tools.parameters import visit_input_values +from galaxy.util.odict import odict + + +def invoke( trans, workflow, target_history, replacement_dict, copy_inputs_to_history=False, ds_map={} ): + """ Run the supplied workflow in the supplied target_history. + """ + return WorkflowInvoker( + trans, + workflow, + target_history, + replacement_dict, + copy_inputs_to_history=copy_inputs_to_history, + ds_map=ds_map, + ).invoke() + + +class WorkflowInvoker( object ): + + def __init__( self, trans, workflow, target_history, replacement_dict, copy_inputs_to_history, ds_map ): + self.trans = trans + self.workflow = workflow + self.target_history = target_history + self.replacement_dict = replacement_dict + self.copy_inputs_to_history = copy_inputs_to_history + self.ds_map = ds_map + + self.outputs = odict() + + def invoke( self ): + workflow_invocation = model.WorkflowInvocation() + workflow_invocation.workflow = self.workflow + + for step in self.workflow.steps: + job = None + job = self._invoke_step( step ) + # Record invocation + workflow_invocation_step = model.WorkflowInvocationStep() + workflow_invocation_step.workflow_invocation = workflow_invocation + workflow_invocation_step.workflow_step = step + workflow_invocation_step.job = job + + # All jobs ran successfully, so we can save now + self.trans.sa_session.add( workflow_invocation ) + + # Not flushing in here, because web controller may create multiple + # invokations. + return self.outputs + + def _invoke_step( self, step ): + if step.type == 'tool' or step.type is None: + job = self._execute_tool_step( step ) + else: + job = self._execute_input_step( step ) + + return job + + def _execute_tool_step( self, step ): + trans = self.trans + outputs = self.outputs + replacement_dict = self.replacement_dict + + tool = trans.app.toolbox.get_tool( step.tool_id ) + + # Connect up + def callback( input, value, prefixed_name, prefixed_label ): + replacement = None + if isinstance( input, DataToolParameter ): + if prefixed_name in step.input_connections_by_name: + conn = step.input_connections_by_name[ prefixed_name ] + if input.multiple: + replacement = [outputs[ c.output_step.id ][ c.output_name ] for c in conn] + else: + replacement = outputs[ conn[0].output_step.id ][ conn[0].output_name ] + return replacement + try: + # Replace DummyDatasets with historydatasetassociations + visit_input_values( tool.inputs, step.state.inputs, callback ) + except KeyError, k: + raise exceptions.MessageException( "Error due to input mapping of '%s' in '%s'. A common cause of this is conditional outputs that cannot be determined until runtime, please review your workflow." % (tool.name, k.message)) + # Execute it + job, out_data = tool.execute( trans, step.state.inputs, history=self.target_history) + outputs[ step.id ] = out_data + # Create new PJA associations with the created job, to be run on completion. + # PJA Parameter Replacement (only applies to immediate actions-- rename specifically, for now) + # Pass along replacement dict with the execution of the PJA so we don't have to modify the object. + for pja in step.post_job_actions: + if pja.action_type in ActionBox.immediate_actions: + ActionBox.execute( trans.app, trans.sa_session, pja, job, replacement_dict ) + else: + job.add_post_job_action(pja) + + return job + + def _execute_input_step( self, step ): + trans = self.trans + outputs = self.outputs + + job, out_data = step.module.execute( trans, step.state ) + outputs[ step.id ] = out_data + + # Web controller may set copy_inputs_to_history, API controller always sets + # ds_map. + if self.copy_inputs_to_history: + for input_dataset_hda in out_data.values(): + new_hda = input_dataset_hda.copy( copy_children=True ) + self.target_history.add_dataset(new_hda) + outputs[ step.id ]['input_ds_copy'] = new_hda + if self.ds_map: + outputs[step.id]['output'] = self.ds_map[ str( step.id ) ][ 'hda' ] + return job + +__all__ = [ invoke ] Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.