7 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/fa92c5497232/ Changeset: fa92c5497232 User: jmchilton Date: 2014-09-10 17:51:10 Summary: Synchronize validation of workflows between web and API controllers. Reduces code duplication and does more correct checking of workflow step replacement parameters. More parameter checking functional tests. Affected #: 6 files diff -r d85d039b8eeb2781619683f62dddf0f4c20af773 -r fa92c5497232444ade0e7044184fb8ccf419177c lib/galaxy/workflow/run.py --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -231,10 +231,10 @@ # Build the state for each step module_injector = modules.WorkflowModuleInjector( self.trans ) for step in self.workflow.steps: - step_errors = module_injector.inject( step ) + step_args = self.param_map.get( step.id, {} ) + step_errors = module_injector.inject( step, step_args=step_args ) if step.type == 'tool' or step.type is None: - _update_step_parameters( step, self.param_map ) - if step.tool_errors: + if step_errors: message = "Workflow cannot be run because of validation errors in some steps: %s" % step_errors raise exceptions.MessageException( message ) if step.upgrade_messages: @@ -242,10 +242,4 @@ raise exceptions.MessageException( message ) -def _update_step_parameters(step, normalized_param_map): - param_dict = normalized_param_map.get(step.id, {}) - if param_dict: - step.state.inputs.update(param_dict) - - __all__ = [ invoke, WorkflowRunConfig ] diff -r d85d039b8eeb2781619683f62dddf0f4c20af773 -r fa92c5497232444ade0e7044184fb8ccf419177c test/api/helpers.py --- a/test/api/helpers.py +++ b/test/api/helpers.py @@ -131,8 +131,9 @@ elif "dataset" in kwds: dataset_id = kwds[ "dataset" ][ "id" ] else: + hid = kwds.get( "hid", -1 ) # If not hid, just grab last dataset dataset_contents = self.galaxy_interactor.get( contents_url ).json() - dataset_id = dataset_contents[ -1 ][ "id" ] + dataset_id = dataset_contents[ hid ][ "id" ] display_response = self.galaxy_interactor.get( "%s/%s/display" % ( contents_url, dataset_id ) ) assert display_response.status_code == 200 diff -r d85d039b8eeb2781619683f62dddf0f4c20af773 -r fa92c5497232444ade0e7044184fb8ccf419177c test/api/test_tools.py --- a/test/api/test_tools.py +++ b/test/api/test_tools.py @@ -109,6 +109,15 @@ output1_content = self.dataset_populator.get_history_dataset_content( history_id, dataset=output1 ) self.assertEqual( output1_content.strip(), "Cat1Test" ) + @skip_without_tool( "validation_default" ) + def test_validation( self ): + history_id = self.dataset_populator.new_history() + inputs = { + 'select_param': "\" ; echo \"moo", + } + response = self._run( "validation_default", history_id, inputs ) + self._assert_status_code_is( response, 400 ) + @skip_without_tool( "cat1" ) def test_run_cat1_with_two_inputs( self ): # Run tool with an multiple data parameter and grouping (repeat) diff -r d85d039b8eeb2781619683f62dddf0f4c20af773 -r fa92c5497232444ade0e7044184fb8ccf419177c test/api/test_workflow_validation_1.ga --- a/test/api/test_workflow_validation_1.ga +++ b/test/api/test_workflow_validation_1.ga @@ -23,7 +23,7 @@ "post_job_actions": {}, "tool_errors": null, "tool_id": "validation_default", - "tool_state": "{\"__page__\": 0, \"__rerun_remap_job_id__\": null, \"input1\": \"\\\"cow\\\"\", \"chromInfo\": \"\\\"/home/john/workspace/galaxy-central/tool-data/shared/ucsc/chrom/?.len\\\"\"}", + "tool_state": "{\"__page__\": 0, \"__rerun_remap_job_id__\": null, \"input1\": \"\\\"cow\\\"\", \"float_param\": \"8.0\", \"select_param\": \"\\\"opt1\\\"\", \"chromInfo\": \"\\\"/home/john/workspace/galaxy-central/tool-data/shared/ucsc/chrom/?.len\\\"\"}", "tool_version": null, "type": "tool", "user_outputs": [] diff -r d85d039b8eeb2781619683f62dddf0f4c20af773 -r fa92c5497232444ade0e7044184fb8ccf419177c test/api/test_workflows.py --- a/test/api/test_workflows.py +++ b/test/api/test_workflows.py @@ -480,18 +480,30 @@ @skip_without_tool( "validation_default" ) def test_parameter_substitution_validation( self ): + substitions = dict( input1="\" ; echo \"moo" ) + run_workflow_response, history_id = self._run_validation_workflow_with_substitions( substitions ) + + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + self.assertEquals("__dq__ X echo __dq__moo\n", self.dataset_populator.get_history_dataset_content( history_id, hid=1 ) ) + + @skip_without_tool( "validation_default" ) + def test_parameter_substitution_validation_value_errors_1( self ): + substitions = dict( select_param="\" ; echo \"moo" ) + run_workflow_response, history_id = self._run_validation_workflow_with_substitions( substitions ) + + self._assert_status_code_is( run_workflow_response, 400 ) + + def _run_validation_workflow_with_substitions( self, substitions ): workflow = self.workflow_populator.load_workflow_from_resource( "test_workflow_validation_1" ) uploaded_workflow_id = self.workflow_populator.create_workflow( workflow ) history_id = self.dataset_populator.new_history() workflow_request = dict( history="hist_id=%s" % history_id, workflow_id=uploaded_workflow_id, - parameters=dumps( dict( validation_default=dict( input1="\" ; echo \"moo" ) ) ) + parameters=dumps( dict( validation_default=substitions ) ) ) run_workflow_response = self._post( "workflows", data=workflow_request ) - self._assert_status_code_is( run_workflow_response, 200 ) - self.dataset_populator.wait_for_history( history_id, assert_ok=True ) - self.assertEquals("__dq__ X echo __dq__moo\n", self.dataset_populator.get_history_dataset_content( history_id ) ) + return run_workflow_response, history_id @skip_without_tool( "random_lines1" ) def test_run_replace_params_by_steps( self ): diff -r d85d039b8eeb2781619683f62dddf0f4c20af773 -r fa92c5497232444ade0e7044184fb8ccf419177c test/functional/tools/validation_default.xml --- a/test/functional/tools/validation_default.xml +++ b/test/functional/tools/validation_default.xml @@ -1,16 +1,27 @@ <tool id="validation_default" name="Validation (default)"><command> echo "$input1" > out1; + echo $float_param > out2; + echo $select_param > out3; </command><inputs> - <param name="input1" type="text" label="Concatenate Dataset" /> + <param name="input1" type="text" label="text input" /> + <param name="float_param" type="float" label="float input" value="8.0" /> + <param name="select_param" type="select" label="select_param"> + <option value="opt1">Option 1</option> + <option value="opt2">Option 2</option> + </param></inputs><outputs><data name="out_file1" from_work_dir="out1" /> + <data name="out_file2" from_work_dir="out2" /> + <data name="out_file3" from_work_dir="out3" /></outputs><tests><test><param name="input1" value="" ; echo "moo" /> + <param name="float_param" value="5" /> + <param name="select_param" value="opt1" /><output name="out_file1"><assert_contents><has_line line="__dq__ X echo __dq__moo" /> https://bitbucket.org/galaxy/galaxy-central/commits/144af99ca953/ Changeset: 144af99ca953 User: jmchilton Date: 2014-09-10 17:51:10 Summary: More workflow module commenting. This time add small comment about config form method. Affected #: 1 file diff -r fa92c5497232444ade0e7044184fb8ccf419177c -r 144af99ca9536f5924fae66c8fb372f150599f73 lib/galaxy/workflow/modules.py --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -96,6 +96,9 @@ pass def get_config_form( self ): + """ Render form that is embedded in workflow editor for modifying the + step state of a node. + """ raise TypeError( "Abstract method" ) def check_and_update_state( self ): https://bitbucket.org/galaxy/galaxy-central/commits/51799404c770/ Changeset: 51799404c770 User: jmchilton Date: 2014-09-10 17:51:10 Summary: Unify runtime state handling methods across tool and input modules. The abstract interface WorkflowModule declared three methods that WorkflowToolModule didn't implement and logic for updating tool state wasn't restricted to the module itself. This introduces a new compute_state method that unifies all of that and includes some documentation. In addition to simplifing the interface new modules must adhere to, this allows eliminating the switch on module type when injecting trasient state attribute into steps - moving that logic into the module itself. In my opinion this makes WorkflowModule closer to being an abstract interface - allowing components to uniformly reason about workflow modules as a black box (and not dispatch on type attributes). These changes did require adding add_dummy_datasets to the base class - and while the input modules don't need to utilize the ability to override this - my pause module downstream did so I think it makes sense to place on the interface. Affected #: 1 file diff -r 144af99ca9536f5924fae66c8fb372f150599f73 -r 51799404c770249f71ffd540c352011509f01329 lib/galaxy/workflow/modules.py --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -108,14 +108,15 @@ """ pass + def add_dummy_datasets( self, connections=None): + # Replaced connected inputs with DummyDataset values. + pass + ## ---- Run time --------------------------------------------------------- def get_runtime_inputs( self ): raise TypeError( "Abstract method" ) - def get_runtime_state( self ): - raise TypeError( "Abstract method" ) - def encode_runtime_state( self, trans, state ): """ Encode the runtime state (loaded from the stored step and populated via the WorkflowModuleInjector below) for use in a hidden @@ -127,10 +128,16 @@ """ raise TypeError( "Abstract method" ) - def decode_runtime_state( self, trans, string ): - raise TypeError( "Abstract method" ) + def compute_state( self, trans, step_updates=None ): + """ Recover the transient "state" attribute to populate corresponding + step with (currently this is always a DefaultToolState instance, + though I am not sure this is strictly nessecary). - def update_runtime_state( self, trans, state, values ): + If `step_updates` is `None`, this is likely for rendering the run form + for instance and no runtime properties are available and state must be + solely determined by step. If `step_updates` are available they describe + the runtime properties supplied by the workflow runner. + """ raise TypeError( "Abstract method" ) def execute( self, trans, state ): @@ -211,6 +218,17 @@ errors[ name ] = error return errors + def compute_state( self, trans, step_updates=None ): + if step_updates: + # Fix this for multiple inputs + state = self.decode_runtime_state( trans, step_updates.pop( "tool_state" ) ) + step_errors = self.update_runtime_state( trans, state, step_updates ) + else: + state = self.get_runtime_state() + step_errors = {} + + return state, step_errors + def execute( self, trans, state ): return None, dict( output=state.inputs['input']) @@ -509,6 +527,21 @@ def check_and_update_state( self ): return self.tool.check_and_update_param_values( self.state.inputs, self.trans, allow_workflow_parameters=True ) + def compute_state( self, trans, step_updates=None ): + # Warning: This method destructively modifies existing step state. + step_errors = None + state = self.state + if step_updates: + # Get the tool + tool = self.tool + # Get old errors + old_errors = state.inputs.pop( "__errors__", {} ) + # Update the state + step_errors = tool.update_state( trans, tool.inputs, state.inputs, step_updates, + update_only=True, old_errors=old_errors ) + else: + return state, step_errors + def add_dummy_datasets( self, connections=None): if connections: # Store onnections by input name @@ -619,36 +652,19 @@ module = step.module = module_factory.from_workflow_step( trans, step ) # Calculating step errors and state depends on whether step is a tool step or not. - if step.type == 'tool' or step.type is None: - if not module: - step.module = None - step.state = None - raise MissingToolException() + if not module: + step.module = None + step.state = None + raise MissingToolException() - # Fix any missing parameters - step.upgrade_messages = module.check_and_update_state() + # Fix any missing parameters + step.upgrade_messages = module.check_and_update_state() - # Any connected input needs to have value DummyDataset (these - # are not persisted so we need to do it every time) - module.add_dummy_datasets( connections=step.input_connections ) + # Any connected input needs to have value DummyDataset (these + # are not persisted so we need to do it every time) + module.add_dummy_datasets( connections=step.input_connections ) - state = module.state - step.state = state - if step_args is not None: - # Get the tool - tool = module.tool - # Get old errors - old_errors = state.inputs.pop( "__errors__", {} ) - # Update the state - step_errors = tool.update_state( trans, tool.inputs, step.state.inputs, step_args, - update_only=True, old_errors=old_errors ) - - else: - if step_args: - # Fix this for multiple inputs - state = step.state = module.decode_runtime_state( trans, step_args.pop( "tool_state" ) ) - step_errors = module.update_runtime_state( trans, state, step_args ) - else: - step.state = step.module.get_runtime_state() + state, step_errors = module.compute_state( trans, step_args ) + step.state = state return step_errors https://bitbucket.org/galaxy/galaxy-central/commits/952e1f153a2d/ Changeset: 952e1f153a2d User: jmchilton Date: 2014-09-10 17:51:10 Summary: Remove some old commented out client-side stuff. (Let me know if these were important.) Affected #: 1 file diff -r 51799404c770249f71ffd540c352011509f01329 -r 952e1f153a2d71673d71952f990f3e5b67495505 templates/webapps/galaxy/workflow/editor.mako --- a/templates/webapps/galaxy/workflow/editor.mako +++ b/templates/webapps/galaxy/workflow/editor.mako @@ -7,16 +7,6 @@ %></%def> -## <%def name="late_javascripts()"> -## <script type='text/javascript' src="${h.url_for('/static/scripts/galaxy.panels.js')}"></script> -## <script type="text/javascript"> -## ensure_dd_helper(); -## make_left_panel( $("#left"), $("#center"), $("#left-border" ) ); -## make_right_panel( $("#right"), $("#center"), $("#right-border" ) ); -## ## handle_minwidth_hint = rp.handle_minwidth_hint; -## </script> -## </%def> - <%def name="javascripts()"> ${parent.javascripts()} @@ -87,18 +77,6 @@ <style type="text/css"> body { margin: 0; padding: 0; overflow: hidden; } - /* Wider right panel */ - ## #center { right: 309px; } - ## #right-border { right: 300px; } - ## #right { width: 300px; } - ## /* Relative masthead size */ - ## #masthead { height: 2.5em; } - ## #masthead div.title { font-size: 1.8em; } - ## #left, #left-border, #center, #right-border, #right { - ## top: 2.5em; - ## margin-top: 7px; - ## } - #left { background: #C1C9E5 url(${h.url_for('/static/style/menu_bg.png')}) top repeat-x; } https://bitbucket.org/galaxy/galaxy-central/commits/3656bf3ec7c4/ Changeset: 3656bf3ec7c4 User: jmchilton Date: 2014-09-10 17:51:10 Summary: Refactor logic for tracking workflow progress out of WorkflowInvoker. Creates a new class WorkflowProgress. This has class achives two objectives: - I am trying to move specialized per-module-type logic out of WorkflowInvoker and into the modules themselves - making it easier to add new modules and because it is better OOP design (invoker shouldn't be dispatching on .type - the modules should provide a consistent interface for invoking themselves). WorkflowProgress will give something that WorkflowInvoker can pass along to the modules to allow them to find their connections and record their outputs. - Downstream in workflow scheduling branch I am placing logic in here for rebuilding the in-memory state of the workflow progress when re-evaluating progress. Affected #: 3 files diff -r 952e1f153a2d71673d71952f990f3e5b67495505 -r 3656bf3ec7c487b8635ceb20de1ae5fe8f2e4f87 lib/galaxy/webapps/galaxy/api/workflows.py --- a/lib/galaxy/webapps/galaxy/api/workflows.py +++ b/lib/galaxy/webapps/galaxy/api/workflows.py @@ -228,6 +228,7 @@ trans=trans, workflow=workflow, workflow_run_config=run_config, + populate_state=True, ) trans.sa_session.flush() diff -r 952e1f153a2d71673d71952f990f3e5b67495505 -r 3656bf3ec7c487b8635ceb20de1ae5fe8f2e4f87 lib/galaxy/workflow/modules.py --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -11,6 +11,7 @@ from elementtree.ElementTree import Element import galaxy.tools +from galaxy import exceptions from galaxy.web.framework import formbuilder from galaxy.jobs.actions.post import ActionBox from galaxy.model import PostJobAction @@ -668,3 +669,20 @@ step.state = state return step_errors + + +def populate_module_and_state( trans, workflow, param_map ): + """ Used by API but not web controller, walks through a workflow's steps + and populates transient module and state attributes on each. + """ + module_injector = WorkflowModuleInjector( trans ) + for step in workflow.steps: + step_args = param_map.get( step.id, {} ) + step_errors = module_injector.inject( step, step_args=step_args ) + if step.type == 'tool' or step.type is None: + if step_errors: + message = "Workflow cannot be run because of validation errors in some steps: %s" % step_errors + raise exceptions.MessageException( message ) + if step.upgrade_messages: + message = "Workflow cannot be run because of step upgrade messages: %s" % step.upgrade_messages + raise exceptions.MessageException( message ) diff -r 952e1f153a2d71673d71952f990f3e5b67495505 -r 3656bf3ec7c487b8635ceb20de1ae5fe8f2e4f87 lib/galaxy/workflow/run.py --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -21,9 +21,12 @@ log = logging.getLogger( __name__ ) -def invoke( trans, workflow, workflow_run_config ): +def invoke( trans, workflow, workflow_run_config, populate_state=False ): """ Run the supplied workflow in the supplied target_history. """ + if populate_state: + modules.populate_module_and_state( trans, workflow, workflow_run_config.param_map ) + return WorkflowInvoker( trans, workflow, @@ -36,29 +39,21 @@ def __init__( self, trans, workflow, workflow_run_config ): self.trans = trans self.workflow = workflow + workflow_invocation = model.WorkflowInvocation() + workflow_invocation.workflow = self.workflow + self.workflow_invocation = workflow_invocation self.target_history = workflow_run_config.target_history self.replacement_dict = workflow_run_config.replacement_dict self.copy_inputs_to_history = workflow_run_config.copy_inputs_to_history - self.inputs_by_step_id = workflow_run_config.inputs - self.param_map = workflow_run_config.param_map + self.progress = WorkflowProgress( self.workflow_invocation, workflow_run_config.inputs ) - self.outputs = odict() # TODO: Attach to actual model object and persist someday... self.invocation_uuid = uuid.uuid1().hex def invoke( self ): - workflow_invocation = model.WorkflowInvocation() - workflow_invocation.workflow = self.workflow - - # Web controller will populate state on each step before calling - # invoke but not API controller. More work should be done to further - # harmonize these methods going forward if possible - if possible - # moving more web controller logic here. - state_populated = not self.workflow.steps or hasattr( self.workflow.steps[ 0 ], "state" ) - if not state_populated: - self._populate_state( ) - - for step in self.workflow.steps: + workflow_invocation = self.workflow_invocation + remaining_steps = self.progress.remaining_steps() + for step in remaining_steps: jobs = self._invoke_step( step ) for job in util.listify( jobs ): # Record invocation @@ -72,7 +67,7 @@ # Not flushing in here, because web controller may create multiple # invocations. - return self.outputs + return self.progress.outputs def _invoke_step( self, step ): if step.type == 'tool' or step.type is None: @@ -84,7 +79,6 @@ def _execute_tool_step( self, step ): trans = self.trans - outputs = self.outputs tool = trans.app.toolbox.get_tool( step.tool_id ) tool_state = step.state @@ -119,7 +113,7 @@ # If collection - just use element model object. replacement = iteration_elements[ prefixed_name ] else: - replacement = self._replacement_for_input( input, prefixed_name, step ) + replacement = self.progress.replacement_for_tool_input( step, input, prefixed_name ) return replacement try: # Replace DummyDatasets with historydatasetassociations @@ -139,10 +133,11 @@ workflow_invocation_uuid=self.invocation_uuid ) if collection_info: - outputs[ step.id ] = dict( execution_tracker.created_collections ) + step_outputs = dict( execution_tracker.created_collections ) + else: - outputs[ step.id ] = dict( execution_tracker.output_datasets ) - + step_outputs = dict( execution_tracker.output_datasets ) + self.progress.set_step_outputs( step, step_outputs ) jobs = execution_tracker.successful_jobs for job in jobs: self._handle_post_job_actions( step, job ) @@ -154,13 +149,13 @@ def callback( input, value, prefixed_name, prefixed_label ): is_data_param = isinstance( input, DataToolParameter ) if is_data_param and not input.multiple: - data = self._replacement_for_input( input, prefixed_name, step ) + data = self.progress.replacement_for_tool_input( step, input, prefixed_name ) if isinstance( data, model.HistoryDatasetCollectionAssociation ): collections_to_match.add( prefixed_name, data ) is_data_collection_param = isinstance( input, DataCollectionToolParameter ) if is_data_collection_param and not input.multiple: - data = self._replacement_for_input( input, prefixed_name, step ) + data = self.progress.replacement_for_tool_input( step, input, prefixed_name ) history_query = input._history_query( self.trans ) if history_query.can_map_over( data ): collections_to_match.add( prefixed_name, data, subcollection_type=input.collection_type ) @@ -170,29 +165,25 @@ def _execute_input_step( self, step ): trans = self.trans - outputs = self.outputs - job, out_data = step.module.execute( trans, step.state ) - outputs[ step.id ] = out_data + job, step_outputs = step.module.execute( trans, step.state ) # Web controller may set copy_inputs_to_history, API controller always sets # inputs. if self.copy_inputs_to_history: - for input_dataset_hda in out_data.values(): + for input_dataset_hda in step_outputs.values(): content_type = input_dataset_hda.history_content_type if content_type == "dataset": new_hda = input_dataset_hda.copy( copy_children=True ) self.target_history.add_dataset( new_hda ) - outputs[ step.id ][ 'input_ds_copy' ] = new_hda + step_outputs[ 'input_ds_copy' ] = new_hda elif content_type == "dataset_collection": new_hdca = input_dataset_hda.copy() self.target_history.add_dataset_collection( new_hdca ) - outputs[ step.id ][ 'input_ds_copy' ] = new_hdca + step_outputs[ 'input_ds_copy' ] = new_hdca else: raise Exception("Unknown history content encountered") - if self.inputs_by_step_id: - outputs[ step.id ][ 'output' ] = self.inputs_by_step_id[ step.id ] - + self.progress.set_outputs_for_input( step, step_outputs ) return job def _handle_post_job_actions( self, step, job ): @@ -205,7 +196,20 @@ else: job.add_post_job_action( pja ) - def _replacement_for_input( self, input, prefixed_name, step ): + +class WorkflowProgress( object ): + + def __init__( self, workflow_invocation, inputs_by_step_id ): + self.outputs = odict() + self.workflow_invocation = workflow_invocation + self.inputs_by_step_id = inputs_by_step_id + + def remaining_steps(self): + steps = self.workflow_invocation.workflow.steps + + return steps + + def replacement_for_tool_input( self, step, input, prefixed_name ): """ For given workflow 'step' that has had input_connections_by_name populated fetch the actual runtime input for the given tool 'input'. """ @@ -213,7 +217,7 @@ if prefixed_name in step.input_connections_by_name: connection = step.input_connections_by_name[ prefixed_name ] if input.multiple: - replacement = [ self._replacement_for_connection( c ) for c in connection ] + replacement = [ self.replacement_for_connection( c ) for c in connection ] # If replacement is just one dataset collection, replace tool # input with dataset collection - tool framework will extract # datasets properly. @@ -221,25 +225,21 @@ if isinstance( replacement[ 0 ], model.HistoryDatasetCollectionAssociation ): replacement = replacement[ 0 ] else: - replacement = self._replacement_for_connection( connection[ 0 ] ) + replacement = self.replacement_for_connection( connection[ 0 ] ) return replacement - def _replacement_for_connection( self, connection ): - return self.outputs[ connection.output_step.id ][ connection.output_name ] + def replacement_for_connection( self, connection ): + step_outputs = self.outputs[ connection.output_step.id ] + return step_outputs[ connection.output_name ] - def _populate_state( self ): - # Build the state for each step - module_injector = modules.WorkflowModuleInjector( self.trans ) - for step in self.workflow.steps: - step_args = self.param_map.get( step.id, {} ) - step_errors = module_injector.inject( step, step_args=step_args ) - if step.type == 'tool' or step.type is None: - if step_errors: - message = "Workflow cannot be run because of validation errors in some steps: %s" % step_errors - raise exceptions.MessageException( message ) - if step.upgrade_messages: - message = "Workflow cannot be run because of step upgrade messages: %s" % step.upgrade_messages - raise exceptions.MessageException( message ) + def set_outputs_for_input( self, step, outputs={} ): + if self.inputs_by_step_id: + outputs[ 'output' ] = self.inputs_by_step_id[ step.id ] + + self.set_step_outputs( step, outputs ) + + def set_step_outputs(self, step, outputs): + self.outputs[ step.id ] = outputs __all__ = [ invoke, WorkflowRunConfig ] https://bitbucket.org/galaxy/galaxy-central/commits/bbe9be042b34/ Changeset: bbe9be042b34 User: jmchilton Date: 2014-09-10 17:51:10 Summary: Unify execute method across tool and input workflow modules. Previously tool module class did not contain this method (though it was abstract in the base class) and extra logic for dealing with tools was located outside of the module itself. Moving everything into the module (made possible by previously committed WorkflowProgress class and attaching some properties to workflow invocations and sending them in) means that now the worklfow invocation logic can uniformly deal with modules and doesn't need to dispatch on type. This should allow extending the capabilities of the workflow invoker and adding new modules independently. Affected #: 2 files diff -r 3656bf3ec7c487b8635ceb20de1ae5fe8f2e4f87 -r bbe9be042b34268c649b42c8c85d238149e910f0 lib/galaxy/workflow/modules.py --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -12,11 +12,15 @@ import galaxy.tools from galaxy import exceptions +from galaxy import model +from galaxy.dataset_collections import matching from galaxy.web.framework import formbuilder from galaxy.jobs.actions.post import ActionBox from galaxy.model import PostJobAction from galaxy.tools.parameters import check_param, DataToolParameter, DummyDataset, RuntimeValue, visit_input_values from galaxy.tools.parameters import DataCollectionToolParameter +from galaxy.tools.parameters.wrapped import make_dict_copy +from galaxy.tools.execute import execute from galaxy.util.bunch import Bunch from galaxy.util import odict from galaxy.util.json import loads @@ -141,7 +145,11 @@ """ raise TypeError( "Abstract method" ) - def execute( self, trans, state ): + def execute( self, trans, progress, invocation, step ): + """ Execute the given workflow step in the given workflow invocation. + Use the supplied workflow progress object to track outputs, find + inputs, etc... + """ raise TypeError( "Abstract method" ) @@ -230,8 +238,26 @@ return state, step_errors - def execute( self, trans, state ): - return None, dict( output=state.inputs['input']) + def execute( self, trans, progress, invocation, step ): + job, step_outputs = None, dict( output=step.state.inputs['input']) + + # Web controller may set copy_inputs_to_history, API controller always sets + # inputs. + if invocation.copy_inputs_to_history: + for input_dataset_hda in step_outputs.values(): + content_type = input_dataset_hda.history_content_type + if content_type == "dataset": + new_hda = input_dataset_hda.copy( copy_children=True ) + invocation.history.add_dataset( new_hda ) + step_outputs[ 'input_ds_copy' ] = new_hda + elif content_type == "dataset_collection": + new_hdca = input_dataset_hda.copy() + invocation.history.add_dataset_collection( new_hdca ) + step_outputs[ 'input_ds_copy' ] = new_hdca + else: + raise Exception("Unknown history content encountered") + progress.set_outputs_for_input( step, step_outputs ) + return job class InputDataModule( InputModule ): @@ -540,8 +566,100 @@ # Update the state step_errors = tool.update_state( trans, tool.inputs, state.inputs, step_updates, update_only=True, old_errors=old_errors ) + return state, step_errors + + def execute( self, trans, progress, invocation, step ): + tool = trans.app.toolbox.get_tool( step.tool_id ) + tool_state = step.state + + collections_to_match = self._find_collections_to_match( tool, progress, step ) + # Have implicit collections... + if collections_to_match.has_collections(): + collection_info = self.trans.app.dataset_collections_service.match_collections( collections_to_match ) else: - return state, step_errors + collection_info = None + + param_combinations = [] + if collection_info: + iteration_elements_iter = collection_info.slice_collections() + else: + iteration_elements_iter = [ None ] + + for iteration_elements in iteration_elements_iter: + execution_state = tool_state.copy() + # TODO: Move next step into copy() + execution_state.inputs = make_dict_copy( execution_state.inputs ) + + # Connect up + def callback( input, value, prefixed_name, prefixed_label ): + replacement = None + if isinstance( input, DataToolParameter ) or isinstance( input, DataCollectionToolParameter ): + if iteration_elements and prefixed_name in iteration_elements: + if isinstance( input, DataToolParameter ): + # Pull out dataset instance from element. + replacement = iteration_elements[ prefixed_name ].dataset_instance + else: + # If collection - just use element model object. + replacement = iteration_elements[ prefixed_name ] + else: + replacement = progress.replacement_for_tool_input( step, input, prefixed_name ) + return replacement + try: + # Replace DummyDatasets with historydatasetassociations + visit_input_values( tool.inputs, execution_state.inputs, callback ) + except KeyError, k: + message_template = "Error due to input mapping of '%s' in '%s'. A common cause of this is conditional outputs that cannot be determined until runtime, please review your workflow." + message = message_template % (tool.name, k.message) + raise exceptions.MessageException( message ) + param_combinations.append( execution_state.inputs ) + + execution_tracker = execute( + trans=self.trans, + tool=tool, + param_combinations=param_combinations, + history=invocation.history, + collection_info=collection_info, + workflow_invocation_uuid=invocation.uuid + ) + if collection_info: + step_outputs = dict( execution_tracker.created_collections ) + else: + step_outputs = dict( execution_tracker.output_datasets ) + progress.set_step_outputs( step, step_outputs ) + jobs = execution_tracker.successful_jobs + for job in jobs: + self._handle_post_job_actions( step, job, invocation.replacement_dict ) + return jobs + + def _find_collections_to_match( self, tool, progress, step ): + collections_to_match = matching.CollectionsToMatch() + + def callback( input, value, prefixed_name, prefixed_label ): + is_data_param = isinstance( input, DataToolParameter ) + if is_data_param and not input.multiple: + data = progress.replacement_for_tool_input( step, input, prefixed_name ) + if isinstance( data, model.HistoryDatasetCollectionAssociation ): + collections_to_match.add( prefixed_name, data ) + + is_data_collection_param = isinstance( input, DataCollectionToolParameter ) + if is_data_collection_param and not input.multiple: + data = progress.replacement_for_tool_input( step, input, prefixed_name ) + history_query = input._history_query( self.trans ) + if history_query.can_map_over( data ): + collections_to_match.add( prefixed_name, data, subcollection_type=input.collection_type ) + + visit_input_values( tool.inputs, step.state.inputs, callback ) + return collections_to_match + + def _handle_post_job_actions( self, step, job, replacement_dict ): + # Create new PJA associations with the created job, to be run on completion. + # PJA Parameter Replacement (only applies to immediate actions-- rename specifically, for now) + # Pass along replacement dict with the execution of the PJA so we don't have to modify the object. + for pja in step.post_job_actions: + if pja.action_type in ActionBox.immediate_actions: + ActionBox.execute( self.trans.app, self.trans.sa_session, pja, job, replacement_dict ) + else: + job.add_post_job_action( pja ) def add_dummy_datasets( self, connections=None): if connections: diff -r 3656bf3ec7c487b8635ceb20de1ae5fe8f2e4f87 -r bbe9be042b34268c649b42c8c85d238149e910f0 lib/galaxy/workflow/run.py --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -1,18 +1,8 @@ import uuid from galaxy import model -from galaxy import exceptions from galaxy import util -from galaxy.dataset_collections import matching - -from galaxy.jobs.actions.post import ActionBox - -from galaxy.tools.parameters.basic import DataToolParameter -from galaxy.tools.parameters.basic import DataCollectionToolParameter -from galaxy.tools.parameters import visit_input_values -from galaxy.tools.parameters.wrapped import make_dict_copy -from galaxy.tools.execute import execute from galaxy.util.odict import odict from galaxy.workflow import modules from galaxy.workflow.run_request import WorkflowRunConfig @@ -42,13 +32,16 @@ workflow_invocation = model.WorkflowInvocation() workflow_invocation.workflow = self.workflow self.workflow_invocation = workflow_invocation - self.target_history = workflow_run_config.target_history - self.replacement_dict = workflow_run_config.replacement_dict - self.copy_inputs_to_history = workflow_run_config.copy_inputs_to_history self.progress = WorkflowProgress( self.workflow_invocation, workflow_run_config.inputs ) - # TODO: Attach to actual model object and persist someday... - self.invocation_uuid = uuid.uuid1().hex + invocation_uuid = uuid.uuid1().hex + + # In one way or another, following attributes will become persistent + # so they are available during delayed/revisited workflow scheduling. + self.workflow_invocation.uuid = invocation_uuid + self.workflow_invocation.history = workflow_run_config.target_history + self.workflow_invocation.copy_inputs_to_history = workflow_run_config.copy_inputs_to_history + self.workflow_invocation.replacement_dict = workflow_run_config.replacement_dict def invoke( self ): workflow_invocation = self.workflow_invocation @@ -70,132 +63,9 @@ return self.progress.outputs def _invoke_step( self, step ): - if step.type == 'tool' or step.type is None: - jobs = self._execute_tool_step( step ) - else: - jobs = self._execute_input_step( step ) - + jobs = step.module.execute( self.trans, self.progress, self.workflow_invocation, step ) return jobs - def _execute_tool_step( self, step ): - trans = self.trans - - tool = trans.app.toolbox.get_tool( step.tool_id ) - tool_state = step.state - - collections_to_match = self._find_collections_to_match( tool, step ) - # Have implicit collections... - if collections_to_match.has_collections(): - collection_info = self.trans.app.dataset_collections_service.match_collections( collections_to_match ) - else: - collection_info = None - - param_combinations = [] - if collection_info: - iteration_elements_iter = collection_info.slice_collections() - else: - iteration_elements_iter = [ None ] - - for iteration_elements in iteration_elements_iter: - execution_state = tool_state.copy() - # TODO: Move next step into copy() - execution_state.inputs = make_dict_copy( execution_state.inputs ) - - # Connect up - def callback( input, value, prefixed_name, prefixed_label ): - replacement = None - if isinstance( input, DataToolParameter ) or isinstance( input, DataCollectionToolParameter ): - if iteration_elements and prefixed_name in iteration_elements: - if isinstance( input, DataToolParameter ): - # Pull out dataset instance from element. - replacement = iteration_elements[ prefixed_name ].dataset_instance - else: - # If collection - just use element model object. - replacement = iteration_elements[ prefixed_name ] - else: - replacement = self.progress.replacement_for_tool_input( step, input, prefixed_name ) - return replacement - try: - # Replace DummyDatasets with historydatasetassociations - visit_input_values( tool.inputs, execution_state.inputs, callback ) - except KeyError, k: - message_template = "Error due to input mapping of '%s' in '%s'. A common cause of this is conditional outputs that cannot be determined until runtime, please review your workflow." - message = message_template % (tool.name, k.message) - raise exceptions.MessageException( message ) - param_combinations.append( execution_state.inputs ) - - execution_tracker = execute( - trans=self.trans, - tool=tool, - param_combinations=param_combinations, - history=self.target_history, - collection_info=collection_info, - workflow_invocation_uuid=self.invocation_uuid - ) - if collection_info: - step_outputs = dict( execution_tracker.created_collections ) - - else: - step_outputs = dict( execution_tracker.output_datasets ) - self.progress.set_step_outputs( step, step_outputs ) - jobs = execution_tracker.successful_jobs - for job in jobs: - self._handle_post_job_actions( step, job ) - return jobs - - def _find_collections_to_match( self, tool, step ): - collections_to_match = matching.CollectionsToMatch() - - def callback( input, value, prefixed_name, prefixed_label ): - is_data_param = isinstance( input, DataToolParameter ) - if is_data_param and not input.multiple: - data = self.progress.replacement_for_tool_input( step, input, prefixed_name ) - if isinstance( data, model.HistoryDatasetCollectionAssociation ): - collections_to_match.add( prefixed_name, data ) - - is_data_collection_param = isinstance( input, DataCollectionToolParameter ) - if is_data_collection_param and not input.multiple: - data = self.progress.replacement_for_tool_input( step, input, prefixed_name ) - history_query = input._history_query( self.trans ) - if history_query.can_map_over( data ): - collections_to_match.add( prefixed_name, data, subcollection_type=input.collection_type ) - - visit_input_values( tool.inputs, step.state.inputs, callback ) - return collections_to_match - - def _execute_input_step( self, step ): - trans = self.trans - - job, step_outputs = step.module.execute( trans, step.state ) - - # Web controller may set copy_inputs_to_history, API controller always sets - # inputs. - if self.copy_inputs_to_history: - for input_dataset_hda in step_outputs.values(): - content_type = input_dataset_hda.history_content_type - if content_type == "dataset": - new_hda = input_dataset_hda.copy( copy_children=True ) - self.target_history.add_dataset( new_hda ) - step_outputs[ 'input_ds_copy' ] = new_hda - elif content_type == "dataset_collection": - new_hdca = input_dataset_hda.copy() - self.target_history.add_dataset_collection( new_hdca ) - step_outputs[ 'input_ds_copy' ] = new_hdca - else: - raise Exception("Unknown history content encountered") - self.progress.set_outputs_for_input( step, step_outputs ) - return job - - def _handle_post_job_actions( self, step, job ): - # Create new PJA associations with the created job, to be run on completion. - # PJA Parameter Replacement (only applies to immediate actions-- rename specifically, for now) - # Pass along replacement dict with the execution of the PJA so we don't have to modify the object. - for pja in step.post_job_actions: - if pja.action_type in ActionBox.immediate_actions: - ActionBox.execute( self.trans.app, self.trans.sa_session, pja, job, self.replacement_dict ) - else: - job.add_post_job_action( pja ) - class WorkflowProgress( object ): https://bitbucket.org/galaxy/galaxy-central/commits/f68f8c8f147b/ Changeset: f68f8c8f147b User: jmchilton Date: 2014-09-10 17:51:10 Summary: API functional test for workflow 'replacement_params' and PJA renaming. Affected #: 2 files diff -r bbe9be042b34268c649b42c8c85d238149e910f0 -r f68f8c8f147b863ec968475216cc9a72044a722b test/api/helpers.py --- a/test/api/helpers.py +++ b/test/api/helpers.py @@ -120,24 +120,38 @@ return tool_response.json() def get_history_dataset_content( self, history_id, wait=True, **kwds ): + dataset_id = self.__history_dataset_id( history_id, wait=wait, **kwds ) + display_response = self.__get_contents_request( history_id, "/%s/display" % dataset_id ) + assert display_response.status_code == 200, display_response.content + return display_response.content + + def get_history_dataset_details( self, history_id, **kwds ): + dataset_id = self.__history_dataset_id( history_id, **kwds ) + details_response = self.__get_contents_request( history_id, "/%s" % dataset_id ) + assert details_response.status_code == 200 + return details_response.json() + + def __history_dataset_id( self, history_id, wait=True, **kwds ): if wait: assert_ok = kwds.get( "assert_ok", True ) self.wait_for_history( history_id, assert_ok=assert_ok ) # kwds should contain a 'dataset' object response, a 'dataset_id' or # the last dataset in the history will be fetched. - contents_url = "histories/%s/contents" % history_id if "dataset_id" in kwds: dataset_id = kwds[ "dataset_id" ] elif "dataset" in kwds: dataset_id = kwds[ "dataset" ][ "id" ] else: hid = kwds.get( "hid", -1 ) # If not hid, just grab last dataset - dataset_contents = self.galaxy_interactor.get( contents_url ).json() - dataset_id = dataset_contents[ hid ][ "id" ] + dataset_contents = self.__get_contents_request( history_id ).json() + dataset_id = dataset_contents[ hid - 1 ][ "id" ] + return dataset_id - display_response = self.galaxy_interactor.get( "%s/%s/display" % ( contents_url, dataset_id ) ) - assert display_response.status_code == 200 - return display_response.content + def __get_contents_request( self, history_id, suffix=""): + url = "histories/%s/contents" % history_id + if suffix: + url = "%s%s" % ( url, suffix ) + return self.galaxy_interactor.get( url ) class WorkflowPopulator( object ): @@ -154,7 +168,7 @@ tool_step[ "post_job_actions" ][ "RenameDatasetActionout_file1" ] = dict( action_type="RenameDatasetAction", output_name="out_file1", - action_arguments=dict( newname="the_new_name" ), + action_arguments=dict( newname="foo ${replaceme}" ), ) return workflow diff -r bbe9be042b34268c649b42c8c85d238149e910f0 -r f68f8c8f147b863ec968475216cc9a72044a722b test/api/test_workflows.py --- a/test/api/test_workflows.py +++ b/test/api/test_workflows.py @@ -467,6 +467,16 @@ assert n == expected_len, "Expected %d steps of type %s, found %d" % ( expected_len, type, n ) return sorted( steps, key=operator.itemgetter("id") ) + @skip_without_tool( "cat1" ) + def test_run_with_pja( self ): + workflow = self.workflow_populator.load_workflow( name="test_for_pja_run", add_pja=True ) + workflow_request, history_id = self._setup_workflow_run( workflow, inputs_by='step_index' ) + workflow_request[ "replacement_params" ] = dumps( dict( replaceme="was replaced" ) ) + run_workflow_response = self._post( "workflows", data=workflow_request ) + self._assert_status_code_is( run_workflow_response, 200 ) + content = self.dataset_populator.get_history_dataset_details( history_id, wait=True, assert_ok=True ) + assert content[ "name" ] == "foo was replaced" + @skip_without_tool( "random_lines1" ) def test_run_replace_params_by_tool( self ): workflow_request, history_id = self._setup_random_x2_workflow( "test_for_replace_tool_params" ) Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.