2 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/4a7abc7892db/ Changeset: 4a7abc7892db User: jmchilton Date: 2014-04-19 18:40:19 Summary: Refactor galaxy.workflow.run interface to use config object. Number of ways to paramemterize workflow is a little bit sprawling already and it is about to get worse. Affected #: 3 files diff -r 4b430f408ac762a6f2a6856fb199a347bc693fbb -r 4a7abc7892db4d6c9046b1ad46e8b04925b052e3 lib/galaxy/webapps/galaxy/api/workflows.py --- a/lib/galaxy/webapps/galaxy/api/workflows.py +++ b/lib/galaxy/webapps/galaxy/api/workflows.py @@ -14,6 +14,7 @@ from galaxy.web.base.controller import UsesHistoryMixin from galaxy.workflow.modules import module_factory from galaxy.workflow.run import invoke +from galaxy.workflow.run import WorkflowRunConfig from galaxy.workflow.extract import extract_workflow @@ -325,12 +326,17 @@ # Run each step, connecting outputs to inputs replacement_dict = payload.get('replacement_params', {}) + + run_config = WorkflowRunConfig( + target_history=history, + replacement_dict=replacement_dict, + ds_map=ds_map, + ) + outputs = invoke( trans=trans, workflow=workflow, - target_history=history, - replacement_dict=replacement_dict, - ds_map=ds_map, + workflow_run_config=run_config, ) trans.sa_session.flush() diff -r 4b430f408ac762a6f2a6856fb199a347bc693fbb -r 4a7abc7892db4d6c9046b1ad46e8b04925b052e3 lib/galaxy/webapps/galaxy/controllers/workflow.py --- a/lib/galaxy/webapps/galaxy/controllers/workflow.py +++ b/lib/galaxy/webapps/galaxy/controllers/workflow.py @@ -29,6 +29,7 @@ from galaxy.web.framework.helpers import to_unicode from galaxy.workflow.modules import module_factory from galaxy.workflow.run import invoke +from galaxy.workflow.run import WorkflowRunConfig from galaxy.workflow.extract import summarize from galaxy.workflow.extract import extract_workflow from galaxy.workflow.steps import ( @@ -1322,12 +1323,16 @@ if k.startswith('wf_parm|'): replacement_dict[k[8:]] = v + run_config = WorkflowRunConfig( + target_history=target_history, + replacement_dict=replacement_dict, + copy_inputs_to_history=new_history is not None + ) + outputs = invoke( trans=trans, workflow=workflow, - target_history=target_history, - replacement_dict=replacement_dict, - copy_inputs_to_history=new_history is not None + workflow_run_config=run_config ) invocations.append({'outputs': outputs, diff -r 4b430f408ac762a6f2a6856fb199a347bc693fbb -r 4a7abc7892db4d6c9046b1ad46e8b04925b052e3 lib/galaxy/workflow/run.py --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -8,28 +8,50 @@ from galaxy.util.odict import odict -def invoke( trans, workflow, target_history, replacement_dict, copy_inputs_to_history=False, ds_map={} ): +class WorkflowRunConfig( object ): + """ Wrapper around all the ways a workflow execution can be parameterized. + + :param target_history: History to execute workflow in. + :type target_history: galaxy.model.History. + + :param replacement_dict: Workflow level parameters used for renaming post + job actions. + :type replacement_dict: dict + + :param copy_inputs_to_history: Should input data parameters be copied to + target_history. (Defaults to False) + :type copy_inputs_to_history: bool + + :param ds_map: Map from step ids to dict's containing HDA for these steps. + :type ds_map: dict + """ + + def __init__( self, target_history, replacement_dict, copy_inputs_to_history=False, ds_map={} ): + self.target_history = target_history + self.replacement_dict = replacement_dict + self.copy_inputs_to_history = copy_inputs_to_history + self.ds_map = ds_map + + +def invoke( trans, workflow, workflow_run_config ): """ Run the supplied workflow in the supplied target_history. """ return WorkflowInvoker( trans, workflow, - target_history, - replacement_dict, - copy_inputs_to_history=copy_inputs_to_history, - ds_map=ds_map, + workflow_run_config, ).invoke() class WorkflowInvoker( object ): - def __init__( self, trans, workflow, target_history, replacement_dict, copy_inputs_to_history, ds_map ): + def __init__( self, trans, workflow, workflow_run_config ): self.trans = trans self.workflow = workflow - self.target_history = target_history - self.replacement_dict = replacement_dict - self.copy_inputs_to_history = copy_inputs_to_history - self.ds_map = ds_map + self.target_history = workflow_run_config.target_history + self.replacement_dict = workflow_run_config.replacement_dict + self.copy_inputs_to_history = workflow_run_config.copy_inputs_to_history + self.ds_map = workflow_run_config.ds_map self.outputs = odict() @@ -128,4 +150,4 @@ replacement = outputs[ connection[ 0 ].output_step.id ][ connection[ 0 ].output_name ] return replacement -__all__ = [ invoke ] +__all__ = [ invoke, WorkflowRunConfig ] https://bitbucket.org/galaxy/galaxy-central/commits/439510f01e3b/ Changeset: 439510f01e3b User: jmchilton Date: 2014-04-19 18:40:19 Summary: Refactor workflow API logic related to module population into run module. Also moves runtime parameter override logic into that module as well. This is a good refactoring because it thins out a fat controllers (slightly) - but it is of particular interest because it makes it easier to recursively invoke potentially nested workflows from the workflow running module itself. It is still an open question whether that is desirable - see discussion on Pull Request #370 - but if it proves to be this can help enable it. Affected #: 2 files diff -r 4a7abc7892db4d6c9046b1ad46e8b04925b052e3 -r 439510f01e3b0850fd844cded14977076ec86501 lib/galaxy/webapps/galaxy/api/workflows.py --- a/lib/galaxy/webapps/galaxy/api/workflows.py +++ b/lib/galaxy/webapps/galaxy/api/workflows.py @@ -21,39 +21,6 @@ log = logging.getLogger(__name__) -def _update_step_parameters(step, param_map): - """ - Update ``step`` parameters based on the user-provided ``param_map`` dict. - - ``param_map`` should be structured as follows:: - - PARAM_MAP = {STEP_ID: PARAM_DICT, ...} - PARAM_DICT = {NAME: VALUE, ...} - - For backwards compatibility, the following (deprecated) format is - also supported for ``param_map``:: - - PARAM_MAP = {TOOL_ID: PARAM_DICT, ...} - - in which case PARAM_DICT affects all steps with the given tool id. - If both by-tool-id and by-step-id specifications are used, the - latter takes precedence. - - Finally (again, for backwards compatibility), PARAM_DICT can also - be specified as:: - - PARAM_DICT = {'param': NAME, 'value': VALUE} - - Note that this format allows only one parameter to be set per step. - """ - param_dict = param_map.get(step.tool_id, {}).copy() - param_dict.update(param_map.get(str(step.id), {})) - if param_dict: - if 'param' in param_dict and 'value' in param_dict: - param_dict[param_dict['param']] = param_dict['value'] - step.state.inputs.update(param_dict) - - class WorkflowsAPIController(BaseAPIController, UsesStoredWorkflowMixin, UsesHistoryMixin): @web.expose_api @@ -290,40 +257,6 @@ trans.response.status = 400 return "Invalid Dataset '%s' Specified" % ds_map[k]['id'] - # Build the state for each step - for step in workflow.steps: - step_errors = None - input_connections_by_name = {} - for conn in step.input_connections: - input_name = conn.input_name - if not input_name in input_connections_by_name: - input_connections_by_name[input_name] = [] - input_connections_by_name[input_name].append(conn) - step.input_connections_by_name = input_connections_by_name - - if step.type == 'tool' or step.type is None: - step.module = module_factory.from_workflow_step( trans, step ) - # Check for missing parameters - step.upgrade_messages = step.module.check_and_update_state() - # Any connected input needs to have value DummyDataset (these - # are not persisted so we need to do it every time) - step.module.add_dummy_datasets( connections=step.input_connections ) - step.state = step.module.state - _update_step_parameters(step, param_map) - if step.tool_errors: - trans.response.status = 400 - return "Workflow cannot be run because of validation errors in some steps: %s" % step_errors - if step.upgrade_messages: - trans.response.status = 400 - return "Workflow cannot be run because of step upgrade messages: %s" % step.upgrade_messages - else: - # This is an input step. Make sure we have an available input. - if step.type == 'data_input' and str(step.id) not in ds_map: - trans.response.status = 400 - return "Workflow cannot be run because an expected input step '%s' has no input dataset." % step.id - step.module = module_factory.from_workflow_step( trans, step ) - step.state = step.module.get_runtime_state() - # Run each step, connecting outputs to inputs replacement_dict = payload.get('replacement_params', {}) @@ -331,6 +264,7 @@ target_history=history, replacement_dict=replacement_dict, ds_map=ds_map, + param_map=param_map, ) outputs = invoke( diff -r 4a7abc7892db4d6c9046b1ad46e8b04925b052e3 -r 439510f01e3b0850fd844cded14977076ec86501 lib/galaxy/workflow/run.py --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -6,6 +6,7 @@ from galaxy.tools.parameters.basic import DataToolParameter from galaxy.tools.parameters import visit_input_values from galaxy.util.odict import odict +from galaxy.workflow import modules class WorkflowRunConfig( object ): @@ -24,13 +25,18 @@ :param ds_map: Map from step ids to dict's containing HDA for these steps. :type ds_map: dict + + :param param_map: Override tool and/or step parameters (see documentation on + _update_step_parameters below). + :type param_map: """ - def __init__( self, target_history, replacement_dict, copy_inputs_to_history=False, ds_map={} ): + def __init__( self, target_history, replacement_dict, copy_inputs_to_history=False, ds_map={}, param_map={} ): self.target_history = target_history self.replacement_dict = replacement_dict self.copy_inputs_to_history = copy_inputs_to_history self.ds_map = ds_map + self.param_map = param_map def invoke( trans, workflow, workflow_run_config ): @@ -52,6 +58,7 @@ self.replacement_dict = workflow_run_config.replacement_dict self.copy_inputs_to_history = workflow_run_config.copy_inputs_to_history self.ds_map = workflow_run_config.ds_map + self.param_map = workflow_run_config.param_map self.outputs = odict() @@ -59,6 +66,13 @@ workflow_invocation = model.WorkflowInvocation() workflow_invocation.workflow = self.workflow + # Web controller will populate stateful modules on each step before calling invoke + # but not API controller. More work should be done to further harmonize these methods + # going forward if possible - if possible moving more web controller logic here. + modules_populated = not self.workflow.steps or hasattr( self.workflow.steps[ 0 ], "module" ) + if not modules_populated: + self._populate_modules( ) + for step in self.workflow.steps: job = None job = self._invoke_step( step ) @@ -150,4 +164,74 @@ replacement = outputs[ connection[ 0 ].output_step.id ][ connection[ 0 ].output_name ] return replacement + def _populate_modules( self ): + # Build the state for each step + for step in self.workflow.steps: + step_errors = None + input_connections_by_name = {} + for conn in step.input_connections: + input_name = conn.input_name + if not input_name in input_connections_by_name: + input_connections_by_name[input_name] = [] + input_connections_by_name[input_name].append(conn) + step.input_connections_by_name = input_connections_by_name + + if step.type == 'tool' or step.type is None: + step.module = modules.module_factory.from_workflow_step( self.trans, step ) + # Check for missing parameters + step.upgrade_messages = step.module.check_and_update_state() + # Any connected input needs to have value DummyDataset (these + # are not persisted so we need to do it every time) + step.module.add_dummy_datasets( connections=step.input_connections ) + step.state = step.module.state + _update_step_parameters( step, self.param_map ) + if step.tool_errors: + message = "Workflow cannot be run because of validation errors in some steps: %s" % step_errors + raise exceptions.MessageException( message ) + if step.upgrade_messages: + message = "Workflow cannot be run because of step upgrade messages: %s" % step.upgrade_messages + raise exceptions.MessageException( message ) + else: + # This is an input step. Make sure we have an available input. + if step.type == 'data_input' and str( step.id ) not in self.ds_map: + message = "Workflow cannot be run because an expected input step '%s' has no input dataset." % step.id + raise exceptions.MessageException( message ) + + step.module = modules.module_factory.from_workflow_step( self.trans, step ) + step.state = step.module.get_runtime_state() + + +def _update_step_parameters(step, param_map): + """ + Update ``step`` parameters based on the user-provided ``param_map`` dict. + + ``param_map`` should be structured as follows:: + + PARAM_MAP = {STEP_ID: PARAM_DICT, ...} + PARAM_DICT = {NAME: VALUE, ...} + + For backwards compatibility, the following (deprecated) format is + also supported for ``param_map``:: + + PARAM_MAP = {TOOL_ID: PARAM_DICT, ...} + + in which case PARAM_DICT affects all steps with the given tool id. + If both by-tool-id and by-step-id specifications are used, the + latter takes precedence. + + Finally (again, for backwards compatibility), PARAM_DICT can also + be specified as:: + + PARAM_DICT = {'param': NAME, 'value': VALUE} + + Note that this format allows only one parameter to be set per step. + """ + param_dict = param_map.get(step.tool_id, {}).copy() + param_dict.update(param_map.get(str(step.id), {})) + if param_dict: + if 'param' in param_dict and 'value' in param_dict: + param_dict[param_dict['param']] = param_dict['value'] + step.state.inputs.update(param_dict) + + __all__ = [ invoke, WorkflowRunConfig ] Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.