1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/03955af11756/ Changeset: 03955af11756 User: jmchilton Date: 2015-01-05 20:38:27+00:00 Summary: Allow specification of post job actions at workflow invocation. Feature requested by Kyle. Implemented only in the API at this point - not sure it is a feature valuable to UI consumers. Pass in PJAs along with step parameters map but keyed on __POST_JOB_ACTIONS__. JSON definition same as when defining PJA in the workflow definition JSON. Includes test cases for normal use and for use after delayed workflow steps have been evaluated by the new workflow scheduling stuff. Affected #: 3 files diff -r 3fc8ce62285bc3df5b934439e624f2d56cb062cf -r 03955af117563248e950ec97d151c4a4615eaa58 lib/galaxy/workflow/modules.py --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -29,6 +29,13 @@ log = logging.getLogger( __name__ ) +# Key into Tool state to describe invocation-specific runtime properties. +RUNTIME_STEP_META_STATE_KEY = "__STEP_META_STATE__" +# Key into step runtime state dict describing invocation-specific post job +# actions (i.e. PJA specified at runtime on top of the workflow-wide defined +# ones. +RUNTIME_POST_JOB_ACTIONS_KEY = "__POST_JOB_ACTIONS__" + class WorkflowModule( object ): @@ -472,6 +479,7 @@ self.tool_id = tool_id self.tool = trans.app.toolbox.get_tool( tool_id, tool_version=tool_version ) self.post_job_actions = {} + self.runtime_post_job_actions = {} self.workflow_outputs = [] self.state = None self.version_changes = [] @@ -556,6 +564,9 @@ state = galaxy.tools.DefaultToolState() app = self.trans.app state.decode( runtime_state, self.tool, app, secure=False ) + state_dict = loads( runtime_state ) + if RUNTIME_STEP_META_STATE_KEY in state_dict: + self.__restore_step_meta_runtime_state( loads( state_dict[ RUNTIME_STEP_META_STATE_KEY ] ) ) return state def normalize_runtime_state( self, runtime_state ): @@ -572,16 +583,19 @@ step.tool_inputs = None step.tool_errors = self.errors for k, v in self.post_job_actions.iteritems(): - # Must have action_type, step. output and a_args are optional. - if 'output_name' in v: - output_name = v['output_name'] - else: - output_name = None - if 'action_arguments' in v: - action_arguments = v['action_arguments'] - else: - action_arguments = None - self.trans.sa_session.add(PostJobAction(v['action_type'], step, output_name, action_arguments)) + pja = self.__to_pja( k, v, step ) + self.trans.sa_session.add( pja ) + + def __to_pja( self, key, value, step ): + if 'output_name' in value: + output_name = value['output_name'] + else: + output_name = None + if 'action_arguments' in value: + action_arguments = value['action_arguments'] + else: + action_arguments = None + return PostJobAction(value['action_type'], step, output_name, action_arguments) def get_name( self ): if self.tool: @@ -677,7 +691,8 @@ tool=self.tool, values=self.state.inputs, errors=( self.errors or {} ) ) def encode_runtime_state( self, trans, state ): - return state.encode( self.tool, self.trans.app ) + encoded = state.encode( self.tool, self.trans.app ) + return encoded def update_state( self, incoming ): # Build a callback that handles setting an input to be required at @@ -712,26 +727,46 @@ self.errors = errors or None def check_and_update_state( self ): - return self.tool.check_and_update_param_values( self.state.inputs, self.trans, allow_workflow_parameters=True ) + inputs = self.state.inputs + return self.tool.check_and_update_param_values( inputs, self.trans, allow_workflow_parameters=True ) def compute_runtime_state( self, trans, step_updates=None, source="html" ): # Warning: This method destructively modifies existing step state. step_errors = None state = self.state + self.runtime_post_job_actions = {} if step_updates: # Get the tool tool = self.tool # Get old errors old_errors = state.inputs.pop( "__errors__", {} ) # Update the state + self.runtime_post_job_actions = step_updates.get(RUNTIME_POST_JOB_ACTIONS_KEY, {}) step_errors = tool.update_state( trans, tool.inputs, state.inputs, step_updates, update_only=True, old_errors=old_errors, source=source ) + step_metadata_runtime_state = self.__step_meta_runtime_state() + if step_metadata_runtime_state: + state.inputs[ RUNTIME_STEP_META_STATE_KEY ] = step_metadata_runtime_state return state, step_errors + def __step_meta_runtime_state( self ): + """ Build a dictionary a of meta-step runtime state (state about how + the workflow step - not the tool state) to be serialized with the Tool + state. + """ + return { RUNTIME_POST_JOB_ACTIONS_KEY: self.runtime_post_job_actions } + + def __restore_step_meta_runtime_state( self, step_runtime_state ): + if RUNTIME_POST_JOB_ACTIONS_KEY in step_runtime_state: + self.runtime_post_job_actions = step_runtime_state[ RUNTIME_POST_JOB_ACTIONS_KEY ] + def execute( self, trans, progress, invocation, step ): tool = trans.app.toolbox.get_tool( step.tool_id, tool_version=step.tool_version ) tool_state = step.state - + # Not strictly needed - but keep Tool state clean by stripping runtime + # metadata parameters from it. + if RUNTIME_STEP_META_STATE_KEY in tool_state.inputs: + del tool_state.inputs[ RUNTIME_STEP_META_STATE_KEY ] collections_to_match = self._find_collections_to_match( tool, progress, step ) # Have implicit collections... if collections_to_match.has_collections(): @@ -815,7 +850,10 @@ # Create new PJA associations with the created job, to be run on completion. # PJA Parameter Replacement (only applies to immediate actions-- rename specifically, for now) # Pass along replacement dict with the execution of the PJA so we don't have to modify the object. - for pja in step.post_job_actions: + post_job_actions = step.post_job_actions + for key, value in self.runtime_post_job_actions.iteritems(): + post_job_actions.append( self.__to_pja( key, value, step ) ) + for pja in post_job_actions: if pja.action_type in ActionBox.immediate_actions: ActionBox.execute( self.trans.app, self.trans.sa_session, pja, job, replacement_dict ) else: diff -r 3fc8ce62285bc3df5b934439e624f2d56cb062cf -r 03955af117563248e950ec97d151c4a4615eaa58 lib/galaxy/workflow/run_request.py --- a/lib/galaxy/workflow/run_request.py +++ b/lib/galaxy/workflow/run_request.py @@ -151,7 +151,7 @@ else: effective_key = key value = param_dict[key] - if isinstance(value, dict) and not ('src' in value and 'id' in value): + if isinstance(value, dict) and (not ('src' in value and 'id' in value) and key != "__POST_JOB_ACTIONS__"): new_params.update(_flatten_step_params( value, effective_key) ) else: new_params[effective_key] = value diff -r 3fc8ce62285bc3df5b934439e624f2d56cb062cf -r 03955af117563248e950ec97d151c4a4615eaa58 test/api/test_workflows.py --- a/test/api/test_workflows.py +++ b/test/api/test_workflows.py @@ -163,8 +163,7 @@ # Wait for workflow to become fully scheduled and then for all jobs # complete. if wait: - self.wait_for_invocation( workflow_id, invocation_id ) - self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + self._wait_for_workflow( workflow_id, invocation_id, history_id ) jobs = self._history_jobs( history_id ) return RunJobsSummary( history_id=history_id, @@ -180,6 +179,12 @@ def _history_jobs( self, history_id ): return self._get("jobs", { "history_id": history_id, "order_by": "create_time" } ).json() + def _wait_for_workflow( self, workflow_id, invocation_id, history_id, assert_ok=True ): + """ Wait for a workflow invocation to completely schedule and then history + to be complete. """ + self.wait_for_invocation( workflow_id, invocation_id ) + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + # Workflow API TODO: # - Allow history_id as param to workflow run action. (hist_id) @@ -743,6 +748,87 @@ content = self.dataset_populator.get_history_dataset_details( history_id, wait=True, assert_ok=True ) assert content[ "name" ] == "foo was replaced" + @skip_without_tool( "cat1" ) + def test_run_with_runtime_pja( self ): + workflow = self.workflow_populator.load_workflow( name="test_for_pja_runtime" ) + uuid0, uuid1, uuid2 = str(uuid4()), str(uuid4()), str(uuid4()) + workflow["steps"]["0"]["uuid"] = uuid0 + workflow["steps"]["1"]["uuid"] = uuid1 + workflow["steps"]["2"]["uuid"] = uuid2 + workflow_request, history_id = self._setup_workflow_run( workflow, inputs_by='step_index' ) + workflow_request[ "replacement_params" ] = dumps( dict( replaceme="was replaced" ) ) + + pja_map = { + "RenameDatasetActionout_file1": dict( + action_type="RenameDatasetAction", + output_name="out_file1", + action_arguments=dict( newname="foo ${replaceme}" ), + ) + } + workflow_request[ "parameters" ] = dumps({ + uuid2: { "__POST_JOB_ACTIONS__": pja_map } + }) + + run_workflow_response = self._post( "workflows", data=workflow_request ) + self._assert_status_code_is( run_workflow_response, 200 ) + content = self.dataset_populator.get_history_dataset_details( history_id, wait=True, assert_ok=True ) + assert content[ "name" ] == "foo was replaced", content[ "name" ] + + @skip_without_tool( "cat1" ) + def test_run_with_delayed_runtime_pja( self ): + workflow_id = self._upload_yaml_workflow(""" +- label: test_input + type: input +- label: first_cat + tool_id: cat1 + state: + input1: + $link: test_input +- label: the_pause + type: pause + connect: + input: + - first_cat#out_file1 +- label: second_cat + tool_id: cat1 + state: + input1: + $link: the_pause +""") + downloaded_workflow = self._download_workflow( workflow_id ) + print downloaded_workflow + uuid_dict = dict( map( lambda (index, step): ( int( index ), step["uuid"] ), downloaded_workflow["steps"].iteritems() ) ) + history_id = self.dataset_populator.new_history() + hda = self.dataset_populator.new_dataset( history_id, content="1 2 3" ) + self.dataset_populator.wait_for_history( history_id ) + inputs = { + '0': self._ds_entry( hda ), + } + print inputs + uuid2 = uuid_dict[ 3 ] + workflow_request = {} + workflow_request[ "replacement_params" ] = dumps( dict( replaceme="was replaced" ) ) + pja_map = { + "RenameDatasetActionout_file1": dict( + action_type="RenameDatasetAction", + output_name="out_file1", + action_arguments=dict( newname="foo ${replaceme}" ), + ) + } + workflow_request[ "parameters" ] = dumps({ + uuid2: { "__POST_JOB_ACTIONS__": pja_map } + }) + invocation_id = self.__invoke_workflow( history_id, workflow_id, inputs=inputs, request=workflow_request ) + + time.sleep( 2 ) + self.dataset_populator.wait_for_history( history_id ) + self.__review_paused_steps( workflow_id, invocation_id, order_index=2, action=True ) + + self._wait_for_workflow( workflow_id, invocation_id, history_id ) + time.sleep( 1 ) + content = self.dataset_populator.get_history_dataset_details( history_id ) + assert content[ "name" ] == "foo was replaced", content[ "name" ] + @skip_without_tool( "random_lines1" ) def test_run_replace_params_by_tool( self ): workflow_request, history_id = self._setup_random_x2_workflow( "test_for_replace_tool_params" ) @@ -932,15 +1018,13 @@ self._assert_status_code_is( hda_info_response, 200 ) self.assertEquals( hda_info_response.json()[ "metadata_data_lines" ], lines ) - def __invoke_workflow( self, history_id, workflow_id, inputs={}, assert_ok=True ): - workflow_request = dict( - history="hist_id=%s" % history_id, - ) - workflow_request[ "inputs" ] = dumps( inputs ) - workflow_request[ "inputs_by" ] = 'step_index' + def __invoke_workflow( self, history_id, workflow_id, inputs={}, request={}, assert_ok=True ): + request["history"] = "hist_id=%s" % history_id, + if inputs: + request[ "inputs" ] = dumps( inputs ) + request[ "inputs_by" ] = 'step_index' url = "workflows/%s/usage" % ( workflow_id ) - - invocation_response = self._post( url, data=workflow_request ) + invocation_response = self._post( url, data=request ) if assert_ok: self._assert_status_code_is( invocation_response, 200 ) invocation_id = invocation_response.json()[ "id" ] Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.