5 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/6c786abce7e2/ Changeset: 6c786abce7e2 User: jmchilton Date: 2014-03-17 16:44:09 Summary: Start work on stand-alone module for extracting workflows from histories. In particular, this moves get_job_dict out of workflow controller. I am making big changes to this downstream in dataset collection work so I want unit tests - additionally get_job_dict isn't really a great name for what this becomes - so I am renaming it to summarize. Affected #: 3 files diff -r c53fbb2655e2756c8977782df2381d883c5aa8af -r 6c786abce7e2735e80f5f57482f72b57a98a74e7 lib/galaxy/webapps/galaxy/controllers/workflow.py --- a/lib/galaxy/webapps/galaxy/controllers/workflow.py +++ b/lib/galaxy/webapps/galaxy/controllers/workflow.py @@ -23,7 +23,6 @@ from galaxy.tools.parameters import visit_input_values from galaxy.tools.parameters.basic import DataToolParameter, DrillDownSelectToolParameter, SelectToolParameter, UnvalidatedValue from galaxy.tools.parameters.grouping import Conditional, Repeat -from galaxy.util.odict import odict from galaxy.util.sanitize_html import sanitize_html from galaxy.util.topsort import CycleError, topsort, topsort_levels from galaxy.web import error, url_for @@ -33,6 +32,7 @@ from galaxy.web.framework.helpers import to_unicode from galaxy.workflow.modules import module_factory from galaxy.workflow.run import invoke +from galaxy.workflow.extract import summarize class StoredWorkflowListGrid( grids.Grid ): @@ -1188,7 +1188,7 @@ if not user: return trans.show_error_message( "Must be logged in to create workflows" ) if ( job_ids is None and dataset_ids is None ) or workflow_name is None: - jobs, warnings = get_job_dict( trans ) + jobs, warnings = summarize( trans ) # Render return trans.fill_template( "workflow/build_from_current_history.mako", @@ -1211,7 +1211,7 @@ dataset_ids = [ int( id ) for id in dataset_ids ] # Find each job, for security we (implicately) check that they are # associated witha job in the current history. - jobs, warnings = get_job_dict( trans ) + jobs, warnings = summarize( trans ) jobs_by_id = dict( ( job.id, job ) for job in jobs.keys() ) steps = [] steps_by_job_id = {} @@ -1743,48 +1743,6 @@ return None -class FakeJob( object ): - """ - Fake job object for datasets that have no creating_job_associations, - they will be treated as "input" datasets. - """ - def __init__( self, dataset ): - self.is_fake = True - self.id = "fake_%s" % dataset.id - - -def get_job_dict( trans ): - """ - Return a dictionary of Job -> [ Dataset ] mappings, for all finished - active Datasets in the current history and the jobs that created them. - """ - history = trans.get_history() - # Get the jobs that created the datasets - warnings = set() - jobs = odict() - for dataset in history.active_datasets: - # FIXME: Create "Dataset.is_finished" - if dataset.state in ( 'new', 'running', 'queued' ): - warnings.add( "Some datasets still queued or running were ignored" ) - continue - - #if this hda was copied from another, we need to find the job that created the origial hda - job_hda = dataset - while job_hda.copied_from_history_dataset_association: - job_hda = job_hda.copied_from_history_dataset_association - - if not job_hda.creating_job_associations: - jobs[ FakeJob( dataset ) ] = [ ( None, dataset ) ] - - for assoc in job_hda.creating_job_associations: - job = assoc.job - if job in jobs: - jobs[ job ].append( ( assoc.name, dataset ) ) - else: - jobs[ job ] = [ ( assoc.name, dataset ) ] - return jobs, warnings - - def cleanup_param_values( inputs, values ): """ Remove 'Data' values from `param_values`, along with metadata cruft, diff -r c53fbb2655e2756c8977782df2381d883c5aa8af -r 6c786abce7e2735e80f5f57482f72b57a98a74e7 lib/galaxy/workflow/extract.py --- /dev/null +++ b/lib/galaxy/workflow/extract.py @@ -0,0 +1,54 @@ +""" This module contains functionality to aid in extracting workflows from +histories. +""" +from galaxy.util.odict import odict + +WARNING_SOME_DATASETS_NOT_READY = "Some datasets still queued or running were ignored" + + +class FakeJob( object ): + """ + Fake job object for datasets that have no creating_job_associations, + they will be treated as "input" datasets. + """ + def __init__( self, dataset ): + self.is_fake = True + self.id = "fake_%s" % dataset.id + + +def summarize( trans, history=None ): + """ Return mapping of job description to datasets for active items in + supplied history - needed for building workflow from a history. + + Formerly call get_job_dict in workflow web controller. + """ + if not history: + history = trans.get_history() + + # Get the jobs that created the datasets + warnings = set() + jobs = odict() + for dataset in history.active_datasets: + # FIXME: Create "Dataset.is_finished" + if dataset.state in ( 'new', 'running', 'queued' ): + warnings.add( WARNING_SOME_DATASETS_NOT_READY ) + continue + + #if this hda was copied from another, we need to find the job that created the origial hda + job_hda = dataset + while job_hda.copied_from_history_dataset_association: + job_hda = job_hda.copied_from_history_dataset_association + + if not job_hda.creating_job_associations: + jobs[ FakeJob( dataset ) ] = [ ( None, dataset ) ] + + for assoc in job_hda.creating_job_associations: + job = assoc.job + if job in jobs: + jobs[ job ].append( ( assoc.name, dataset ) ) + else: + jobs[ job ] = [ ( assoc.name, dataset ) ] + + return jobs, warnings + +__all__ = [ summarize ] diff -r c53fbb2655e2756c8977782df2381d883c5aa8af -r 6c786abce7e2735e80f5f57482f72b57a98a74e7 test/unit/workflows/test_extract_summary.py --- /dev/null +++ b/test/unit/workflows/test_extract_summary.py @@ -0,0 +1,98 @@ +import unittest + +from galaxy import model +from galaxy.workflow import extract + +UNDEFINED_JOB = object() + + +class TestWorkflowExtractSummary( unittest.TestCase ): + + def setUp( self ): + self.history = MockHistory() + self.trans = MockTrans( self.history ) + + def test_empty_history( self ): + job_dict, warnings = extract.summarize( trans=self.trans ) + assert not warnings + assert not job_dict + + def test_summarize_returns_name_and_dataset_list( self ): + # Create two jobs and three datasets, test they are groupped + # by job correctly with correct output names. + hda1 = MockHda() + self.history.active_datasets.append( hda1 ) + hda2 = MockHda( job=hda1.job, output_name="out2" ) + self.history.active_datasets.append( hda2 ) + hda3 = MockHda( output_name="out3" ) + self.history.active_datasets.append( hda3 ) + + job_dict, warnings = extract.summarize( trans=self.trans ) + assert len( job_dict ) == 2 + assert not warnings + self.assertEquals( job_dict[ hda1.job ], [ ( 'out1', hda1 ), ( 'out2', hda2 ) ] ) + self.assertEquals( job_dict[ hda3.job ], [ ( 'out3', hda3 ) ] ) + + def test_finds_original_job_if_copied( self ): + hda = MockHda() + derived_hda_1 = MockHda() + derived_hda_1.copied_from_history_dataset_association = hda + derived_hda_2 = MockHda() + derived_hda_2.copied_from_history_dataset_association = derived_hda_1 + self.history.active_datasets.append( derived_hda_2 ) + job_dict, warnings = extract.summarize( trans=self.trans ) + assert not warnings + assert len( job_dict ) == 1 + self.assertEquals( job_dict[ hda.job ], [ ('out1', derived_hda_2 ) ] ) + + def test_fake_job( self ): + """ Fakes job if creating_job_associations is empty. + """ + hda = MockHda( job=UNDEFINED_JOB ) + self.history.active_datasets.append( hda ) + job_dict, warnings = extract.summarize( trans=self.trans ) + assert not warnings + assert len( job_dict ) == 1 + fake_job = job_dict.keys()[ 0 ] + assert fake_job.id.startswith( "fake_" ) + datasets = job_dict.values()[ 0 ] + assert datasets == [ ( None, hda ) ] + + def test_warns_and_skips_datasets_if_not_finished( self ): + hda = MockHda( state='queued' ) + self.history.active_datasets.append( hda ) + job_dict, warnings = extract.summarize( trans=self.trans ) + assert warnings + assert len( job_dict ) == 0 + + +class MockHistory( object ): + + def __init__( self ): + self.active_datasets = [] + + +class MockTrans( object ): + + def __init__( self, history ): + self.history = history + + def get_history( self ): + return self.history + + +class MockHda( object ): + + def __init__( self, state='ok', output_name='out1', job=None ): + self.id = 123 + self.state = state + self.copied_from_history_dataset_association = None + if job is not UNDEFINED_JOB: + if not job: + job = model.Job() + self.job = job + assoc = model.JobToOutputDatasetAssociation( output_name, self ) + assoc.job = job + self.creating_job_associations = [ assoc ] + else: + self.creating_job_associations = [] https://bitbucket.org/galaxy/galaxy-central/commits/69e29134e64c/ Changeset: 69e29134e64c User: jmchilton Date: 2014-03-17 16:44:09 Summary: Refactor workflow controller step utilities into new module. This will allow for cleaner reuse outside of web controller, clean up API import of attach_steps accordingly. Affected #: 3 files diff -r 6c786abce7e2735e80f5f57482f72b57a98a74e7 -r 69e29134e64c08c09e94a9f69d59329eb2f52107 lib/galaxy/web/base/controller.py --- a/lib/galaxy/web/base/controller.py +++ b/lib/galaxy/web/base/controller.py @@ -41,6 +41,7 @@ from galaxy.tools.parameters.basic import DataToolParameter from galaxy.util.json import to_json_string from galaxy.workflow.modules import ToolModule +from galaxy.workflow.build_util import attach_ordered_steps log = logging.getLogger( __name__ ) @@ -1626,7 +1627,6 @@ """ Creates a workflow from a dict. Created workflow is stored in the database and returned. """ - from galaxy.webapps.galaxy.controllers.workflow import attach_ordered_steps # Put parameters in workflow mode trans.workflow_building_mode = True diff -r 6c786abce7e2735e80f5f57482f72b57a98a74e7 -r 69e29134e64c08c09e94a9f69d59329eb2f52107 lib/galaxy/webapps/galaxy/controllers/workflow.py --- a/lib/galaxy/webapps/galaxy/controllers/workflow.py +++ b/lib/galaxy/webapps/galaxy/controllers/workflow.py @@ -4,7 +4,6 @@ import base64 import httplib import json -import math import os import sgmllib import svgfig @@ -24,7 +23,6 @@ from galaxy.tools.parameters.basic import DataToolParameter, DrillDownSelectToolParameter, SelectToolParameter, UnvalidatedValue from galaxy.tools.parameters.grouping import Conditional, Repeat from galaxy.util.sanitize_html import sanitize_html -from galaxy.util.topsort import CycleError, topsort, topsort_levels from galaxy.web import error, url_for from galaxy.web.base.controller import BaseUIController, SharableMixin, UsesStoredWorkflowMixin from galaxy.web.framework import form @@ -33,6 +31,12 @@ from galaxy.workflow.modules import module_factory from galaxy.workflow.run import invoke from galaxy.workflow.extract import summarize +from galaxy.workflow.steps import ( + attach_ordered_steps, + order_workflow_steps, + edgelist_for_workflow_steps, + order_workflow_steps_with_levels, +) class StoredWorkflowListGrid( grids.Grid ): @@ -1692,57 +1696,6 @@ ## ---- Utility methods ------------------------------------------------------- -def attach_ordered_steps( workflow, steps ): - ordered_steps = order_workflow_steps( steps ) - if ordered_steps: - workflow.has_cycles = False - for i, step in enumerate( ordered_steps ): - step.order_index = i - workflow.steps.append( step ) - else: - workflow.has_cycles = True - workflow.steps = steps - - -def edgelist_for_workflow_steps( steps ): - """ - Create a list of tuples representing edges between ``WorkflowSteps`` based - on associated ``WorkflowStepConnection``s - """ - edges = [] - steps_to_index = dict( ( step, i ) for i, step in enumerate( steps ) ) - for step in steps: - edges.append( ( steps_to_index[step], steps_to_index[step] ) ) - for conn in step.input_connections: - edges.append( ( steps_to_index[conn.output_step], steps_to_index[conn.input_step] ) ) - return edges - - -def order_workflow_steps( steps ): - """ - Perform topological sort of the steps, return ordered or None - """ - position_data_available = True - for step in steps: - if not step.position or not 'left' in step.position or not 'top' in step.position: - position_data_available = False - if position_data_available: - steps.sort(cmp=lambda s1, s2: cmp( math.sqrt(s1.position['left'] ** 2 + s1.position['top'] ** 2), math.sqrt(s2.position['left'] ** 2 + s2.position['top'] ** 2))) - try: - edges = edgelist_for_workflow_steps( steps ) - node_order = topsort( edges ) - return [ steps[i] for i in node_order ] - except CycleError: - return None - - -def order_workflow_steps_with_levels( steps ): - try: - return topsort_levels( edgelist_for_workflow_steps( steps ) ) - except CycleError: - return None - - def cleanup_param_values( inputs, values ): """ Remove 'Data' values from `param_values`, along with metadata cruft, diff -r 6c786abce7e2735e80f5f57482f72b57a98a74e7 -r 69e29134e64c08c09e94a9f69d59329eb2f52107 lib/galaxy/workflow/steps.py --- /dev/null +++ b/lib/galaxy/workflow/steps.py @@ -0,0 +1,63 @@ +""" This module contains utility methods for reasoning about and ordering +workflow steps. +""" +import math +from galaxy.util.topsort import ( + CycleError, + topsort, + topsort_levels +) + + +def attach_ordered_steps( workflow, steps ): + """ Attempt to topologically order steps and attach to workflow. If this + fails - the workflow contains cycles so it mark it as such. + """ + ordered_steps = order_workflow_steps( steps ) + if ordered_steps: + workflow.has_cycles = False + for i, step in enumerate( ordered_steps ): + step.order_index = i + workflow.steps.append( step ) + else: + workflow.has_cycles = True + workflow.steps = steps + + +def order_workflow_steps( steps ): + """ + Perform topological sort of the steps, return ordered or None + """ + position_data_available = True + for step in steps: + if not step.position or not 'left' in step.position or not 'top' in step.position: + position_data_available = False + if position_data_available: + steps.sort(cmp=lambda s1, s2: cmp( math.sqrt(s1.position['left'] ** 2 + s1.position['top'] ** 2), math.sqrt(s2.position['left'] ** 2 + s2.position['top'] ** 2))) + try: + edges = edgelist_for_workflow_steps( steps ) + node_order = topsort( edges ) + return [ steps[i] for i in node_order ] + except CycleError: + return None + + +def edgelist_for_workflow_steps( steps ): + """ + Create a list of tuples representing edges between ``WorkflowSteps`` based + on associated ``WorkflowStepConnection``s + """ + edges = [] + steps_to_index = dict( ( step, i ) for i, step in enumerate( steps ) ) + for step in steps: + edges.append( ( steps_to_index[step], steps_to_index[step] ) ) + for conn in step.input_connections: + edges.append( ( steps_to_index[conn.output_step], steps_to_index[conn.input_step] ) ) + return edges + + +def order_workflow_steps_with_levels( steps ): + try: + return topsort_levels( edgelist_for_workflow_steps( steps ) ) + except CycleError: + return None https://bitbucket.org/galaxy/galaxy-central/commits/0eb7631a6d67/ Changeset: 0eb7631a6d67 User: jmchilton Date: 2014-03-17 16:44:09 Summary: Refactor code for extracting workflow from history into workflow extract module. Will allow reuse with API controller. Affected #: 2 files diff -r 69e29134e64c08c09e94a9f69d59329eb2f52107 -r 0eb7631a6d67317a93b480794d9c4a33a3ceceda lib/galaxy/webapps/galaxy/controllers/workflow.py --- a/lib/galaxy/webapps/galaxy/controllers/workflow.py +++ b/lib/galaxy/webapps/galaxy/controllers/workflow.py @@ -19,9 +19,8 @@ from galaxy.datatypes.data import Data from galaxy.model.item_attrs import UsesItemRatings from galaxy.model.mapping import desc +from galaxy.tools.parameters.basic import DataToolParameter from galaxy.tools.parameters import visit_input_values -from galaxy.tools.parameters.basic import DataToolParameter, DrillDownSelectToolParameter, SelectToolParameter, UnvalidatedValue -from galaxy.tools.parameters.grouping import Conditional, Repeat from galaxy.util.sanitize_html import sanitize_html from galaxy.web import error, url_for from galaxy.web.base.controller import BaseUIController, SharableMixin, UsesStoredWorkflowMixin @@ -31,6 +30,7 @@ from galaxy.workflow.modules import module_factory from galaxy.workflow.run import invoke from galaxy.workflow.extract import summarize +from galaxy.workflow.extract import extract_workflow from galaxy.workflow.steps import ( attach_ordered_steps, order_workflow_steps, @@ -1201,82 +1201,13 @@ history=history ) else: - # Ensure job_ids and dataset_ids are lists (possibly empty) - if job_ids is None: - job_ids = [] - elif type( job_ids ) is not list: - job_ids = [ job_ids ] - if dataset_ids is None: - dataset_ids = [] - elif type( dataset_ids ) is not list: - dataset_ids = [ dataset_ids ] - # Convert both sets of ids to integers - job_ids = [ int( id ) for id in job_ids ] - dataset_ids = [ int( id ) for id in dataset_ids ] - # Find each job, for security we (implicately) check that they are - # associated witha job in the current history. - jobs, warnings = summarize( trans ) - jobs_by_id = dict( ( job.id, job ) for job in jobs.keys() ) - steps = [] - steps_by_job_id = {} - hid_to_output_pair = {} - # Input dataset steps - for hid in dataset_ids: - step = model.WorkflowStep() - step.type = 'data_input' - step.tool_inputs = dict( name="Input Dataset" ) - hid_to_output_pair[ hid ] = ( step, 'output' ) - steps.append( step ) - # Tool steps - for job_id in job_ids: - assert job_id in jobs_by_id, "Attempt to create workflow with job not connected to current history" - job = jobs_by_id[ job_id ] - tool = trans.app.toolbox.get_tool( job.tool_id ) - param_values = job.get_param_values( trans.app, ignore_errors=True ) # If a tool was updated and e.g. had a text value changed to an integer, we don't want a traceback here - associations = cleanup_param_values( tool.inputs, param_values ) - step = model.WorkflowStep() - step.type = 'tool' - step.tool_id = job.tool_id - step.tool_inputs = tool.params_to_strings( param_values, trans.app ) - # NOTE: We shouldn't need to do two passes here since only - # an earlier job can be used as an input to a later - # job. - for other_hid, input_name in associations: - if other_hid in hid_to_output_pair: - other_step, other_name = hid_to_output_pair[ other_hid ] - conn = model.WorkflowStepConnection() - conn.input_step = step - conn.input_name = input_name - # Should always be connected to an earlier step - conn.output_step = other_step - conn.output_name = other_name - steps.append( step ) - steps_by_job_id[ job_id ] = step - # Store created dataset hids - for assoc in job.output_datasets: - hid_to_output_pair[ assoc.dataset.hid ] = ( step, assoc.name ) - # Workflow to populate - workflow = model.Workflow() - workflow.name = workflow_name - # Order the steps if possible - attach_ordered_steps( workflow, steps ) - # And let's try to set up some reasonable locations on the canvas - # (these are pretty arbitrary values) - levorder = order_workflow_steps_with_levels( steps ) - base_pos = 10 - for i, steps_at_level in enumerate( levorder ): - for j, index in enumerate( steps_at_level ): - step = steps[ index ] - step.position = dict( top=( base_pos + 120 * j ), - left=( base_pos + 220 * i ) ) - # Store it - stored = model.StoredWorkflow() - stored.user = user - stored.name = workflow_name - workflow.stored_workflow = stored - stored.latest_workflow = workflow - trans.sa_session.add( stored ) - trans.sa_session.flush() + extract_workflow( + trans, + user=user, + job_ids=job_ids, + dataset_ids=dataset_ids, + workflow_name=workflow_name + ) # Index page with message return trans.show_message( "Workflow '%s' created from current history." % workflow_name ) ## return trans.show_ok_message( "<p>Workflow '%s' created.</p><p><a target='_top' href='%s'>Click to load in workflow editor</a></p>" @@ -1696,55 +1627,6 @@ ## ---- Utility methods ------------------------------------------------------- -def cleanup_param_values( inputs, values ): - """ - Remove 'Data' values from `param_values`, along with metadata cruft, - but track the associations. - """ - associations = [] - # dbkey is pushed in by the framework - if 'dbkey' in values: - del values['dbkey'] - root_values = values - - # Recursively clean data inputs and dynamic selects - def cleanup( prefix, inputs, values ): - for key, input in inputs.items(): - if isinstance( input, ( SelectToolParameter, DrillDownSelectToolParameter ) ): - if input.is_dynamic and not isinstance( values[key], UnvalidatedValue ): - values[key] = UnvalidatedValue( values[key] ) - if isinstance( input, DataToolParameter ): - tmp = values[key] - values[key] = None - # HACK: Nested associations are not yet working, but we - # still need to clean them up so we can serialize - # if not( prefix ): - if tmp: # this is false for a non-set optional dataset - if not isinstance(tmp, list): - associations.append( ( tmp.hid, prefix + key ) ) - else: - associations.extend( [ (t.hid, prefix + key) for t in tmp] ) - - # Cleanup the other deprecated crap associated with datasets - # as well. Worse, for nested datasets all the metadata is - # being pushed into the root. FIXME: MUST REMOVE SOON - key = prefix + key + "_" - for k in root_values.keys(): - if k.startswith( key ): - del root_values[k] - elif isinstance( input, Repeat ): - group_values = values[key] - for i, rep_values in enumerate( group_values ): - rep_index = rep_values['__index__'] - cleanup( "%s%s_%d|" % (prefix, key, rep_index ), input.inputs, group_values[i] ) - elif isinstance( input, Conditional ): - group_values = values[input.name] - current_case = group_values['__current_case__'] - cleanup( "%s%s|" % ( prefix, key ), input.cases[current_case].inputs, group_values ) - cleanup( "", inputs, values ) - return associations - - def _build_workflow_on_str(instance_ds_names): # Returns suffix for new histories based on multi input iteration num_multi_inputs = len(instance_ds_names) diff -r 69e29134e64c08c09e94a9f69d59329eb2f52107 -r 0eb7631a6d67317a93b480794d9c4a33a3ceceda lib/galaxy/workflow/extract.py --- a/lib/galaxy/workflow/extract.py +++ b/lib/galaxy/workflow/extract.py @@ -2,10 +2,108 @@ histories. """ from galaxy.util.odict import odict +from galaxy import model +from galaxy.tools.parameters.basic import ( + DataToolParameter, + DrillDownSelectToolParameter, + SelectToolParameter, + UnvalidatedValue +) +from galaxy.tools.parameters.grouping import ( + Conditional, + Repeat +) +from .steps import ( + attach_ordered_steps, + order_workflow_steps_with_levels +) WARNING_SOME_DATASETS_NOT_READY = "Some datasets still queued or running were ignored" +def extract_workflow( trans, user, history=None, job_ids=None, dataset_ids=None, workflow_name=None ): + steps = extract_steps( trans, history=history, job_ids=job_ids, dataset_ids=dataset_ids ) + # Workflow to populate + workflow = model.Workflow() + workflow.name = workflow_name + # Order the steps if possible + attach_ordered_steps( workflow, steps ) + # And let's try to set up some reasonable locations on the canvas + # (these are pretty arbitrary values) + levorder = order_workflow_steps_with_levels( steps ) + base_pos = 10 + for i, steps_at_level in enumerate( levorder ): + for j, index in enumerate( steps_at_level ): + step = steps[ index ] + step.position = dict( top=( base_pos + 120 * j ), + left=( base_pos + 220 * i ) ) + # Store it + stored = model.StoredWorkflow() + stored.user = user + stored.name = workflow_name + workflow.stored_workflow = stored + stored.latest_workflow = workflow + trans.sa_session.add( stored ) + trans.sa_session.flush() + return stored + + +def extract_steps( trans, history=None, job_ids=None, dataset_ids=None ): + # Ensure job_ids and dataset_ids are lists (possibly empty) + if job_ids is None: + job_ids = [] + elif type( job_ids ) is not list: + job_ids = [ job_ids ] + if dataset_ids is None: + dataset_ids = [] + elif type( dataset_ids ) is not list: + dataset_ids = [ dataset_ids ] + # Convert both sets of ids to integers + job_ids = [ int( id ) for id in job_ids ] + dataset_ids = [ int( id ) for id in dataset_ids ] + # Find each job, for security we (implicately) check that they are + # associated witha job in the current history. + jobs, warnings = summarize( trans, history=history ) + jobs_by_id = dict( ( job.id, job ) for job in jobs.keys() ) + steps = [] + steps_by_job_id = {} + hid_to_output_pair = {} + # Input dataset steps + for hid in dataset_ids: + step = model.WorkflowStep() + step.type = 'data_input' + step.tool_inputs = dict( name="Input Dataset" ) + hid_to_output_pair[ hid ] = ( step, 'output' ) + steps.append( step ) + # Tool steps + for job_id in job_ids: + assert job_id in jobs_by_id, "Attempt to create workflow with job not connected to current history" + job = jobs_by_id[ job_id ] + tool_inputs, associations = step_inputs( trans, job ) + step = model.WorkflowStep() + step.type = 'tool' + step.tool_id = job.tool_id + step.tool_inputs = tool_inputs + # NOTE: We shouldn't need to do two passes here since only + # an earlier job can be used as an input to a later + # job. + for other_hid, input_name in associations: + if other_hid in hid_to_output_pair: + other_step, other_name = hid_to_output_pair[ other_hid ] + conn = model.WorkflowStepConnection() + conn.input_step = step + conn.input_name = input_name + # Should always be connected to an earlier step + conn.output_step = other_step + conn.output_name = other_name + steps.append( step ) + steps_by_job_id[ job_id ] = step + # Store created dataset hids + for assoc in job.output_datasets: + hid_to_output_pair[ assoc.dataset.hid ] = ( step, assoc.name ) + return steps + + class FakeJob( object ): """ Fake job object for datasets that have no creating_job_associations, @@ -51,4 +149,61 @@ return jobs, warnings -__all__ = [ summarize ] + +def step_inputs( trans, job ): + tool = trans.app.toolbox.get_tool( job.tool_id ) + param_values = job.get_param_values( trans.app, ignore_errors=True ) # If a tool was updated and e.g. had a text value changed to an integer, we don't want a traceback here + associations = __cleanup_param_values( tool.inputs, param_values ) + tool_inputs = tool.params_to_strings( param_values, trans.app ) + return tool_inputs, associations + + +def __cleanup_param_values( inputs, values ): + """ + Remove 'Data' values from `param_values`, along with metadata cruft, + but track the associations. + """ + associations = [] + # dbkey is pushed in by the framework + if 'dbkey' in values: + del values['dbkey'] + root_values = values + + # Recursively clean data inputs and dynamic selects + def cleanup( prefix, inputs, values ): + for key, input in inputs.items(): + if isinstance( input, ( SelectToolParameter, DrillDownSelectToolParameter ) ): + if input.is_dynamic and not isinstance( values[key], UnvalidatedValue ): + values[key] = UnvalidatedValue( values[key] ) + if isinstance( input, DataToolParameter ): + tmp = values[key] + values[key] = None + # HACK: Nested associations are not yet working, but we + # still need to clean them up so we can serialize + # if not( prefix ): + if tmp: # this is false for a non-set optional dataset + if not isinstance(tmp, list): + associations.append( ( tmp.hid, prefix + key ) ) + else: + associations.extend( [ (t.hid, prefix + key) for t in tmp] ) + + # Cleanup the other deprecated crap associated with datasets + # as well. Worse, for nested datasets all the metadata is + # being pushed into the root. FIXME: MUST REMOVE SOON + key = prefix + key + "_" + for k in root_values.keys(): + if k.startswith( key ): + del root_values[k] + elif isinstance( input, Repeat ): + group_values = values[key] + for i, rep_values in enumerate( group_values ): + rep_index = rep_values['__index__'] + cleanup( "%s%s_%d|" % (prefix, key, rep_index ), input.inputs, group_values[i] ) + elif isinstance( input, Conditional ): + group_values = values[input.name] + current_case = group_values['__current_case__'] + cleanup( "%s%s|" % ( prefix, key ), input.cases[current_case].inputs, group_values ) + cleanup( "", inputs, values ) + return associations + +__all__ = [ summarize, extract_workflow ] https://bitbucket.org/galaxy/galaxy-central/commits/17a2dc7d3902/ Changeset: 17a2dc7d3902 User: jmchilton Date: 2014-03-17 16:44:09 Summary: Add ability to extract workflow from a history to workflows API. Affected #: 2 files diff -r 0eb7631a6d67317a93b480794d9c4a33a3ceceda -r 17a2dc7d39027d97a4185adbd03944b4bb1642cf lib/galaxy/webapps/galaxy/api/workflows.py --- a/lib/galaxy/webapps/galaxy/api/workflows.py +++ b/lib/galaxy/webapps/galaxy/api/workflows.py @@ -11,8 +11,10 @@ from galaxy import web from galaxy.web import _future_expose_api as expose_api from galaxy.web.base.controller import BaseAPIController, url_for, UsesStoredWorkflowMixin +from galaxy.web.base.controller import UsesHistoryMixin from galaxy.workflow.modules import module_factory from galaxy.workflow.run import invoke +from galaxy.workflow.extract import extract_workflow log = logging.getLogger(__name__) @@ -51,7 +53,7 @@ step.state.inputs.update(param_dict) -class WorkflowsAPIController(BaseAPIController, UsesStoredWorkflowMixin): +class WorkflowsAPIController(BaseAPIController, UsesStoredWorkflowMixin, UsesHistoryMixin): @web.expose_api def index(self, trans, **kwd): @@ -141,9 +143,11 @@ """ POST /api/workflows - We're not creating workflows from the api. Just execute for now. + Run or create workflows from the api. - However, we will import them if installed_repository_file is specified. + If installed_repository_file or from_history_id is specified a new + workflow will be created for this user. Otherwise, workflow_id must be + specified and this API method will cause a workflow to execute. :param installed_repository_file The path of a workflow to import. Either workflow_id or installed_repository_file must be specified :type installed_repository_file str @@ -165,6 +169,15 @@ :param replacement_params: A dictionary used when renaming datasets :type replacement_params: dict + + :param from_history_id: Id of history to extract a workflow from. Should not be used with worfklow_id or installed_repository_file. + :type from_history_id: str + + :param job_ids: If from_history_id is set - this should be a list of jobs to include when extracting workflow from history. + :type job_ids: str + + :param dataset_ids: If from_history_id is set - this should be a list of HDA ids corresponding to workflow inputs when extracting workflow from history. + :type dataset_ids: str """ # Pull parameters out of payload. @@ -183,6 +196,24 @@ cntrller='api', **payload) return result + if 'from_history_id' in payload: + from_history_id = payload.get( 'from_history_id' ) + history = self.get_history( trans, from_history_id, check_ownership=False, check_accessible=True ) + job_ids = map( trans.security.decode_id, payload.get( "job_ids", [] ) ) + dataset_ids = map( trans.security.decode_id, payload.get( "dataset_ids", [] ) ) + workflow_name = payload[ "workflow_name" ] + stored_workflow = extract_workflow( + trans=trans, + user=trans.get_user(), + history=history, + job_ids=job_ids, + dataset_ids=dataset_ids, + workflow_name=workflow_name, + ) + item = stored_workflow.to_dict( value_mapper={ "id": trans.security.encode_id } ) + item[ 'url' ] = url_for( 'workflow', id=item[ "id" ] ) + return item + trans.response.status = 403 return "Either workflow_id or installed_repository_file must be specified" if 'installed_repository_file' in payload: diff -r 0eb7631a6d67317a93b480794d9c4a33a3ceceda -r 17a2dc7d39027d97a4185adbd03944b4bb1642cf test/functional/api/test_workflows.py --- a/test/functional/api/test_workflows.py +++ b/test/functional/api/test_workflows.py @@ -67,6 +67,39 @@ self._assert_status_code_is( run_workflow_response, 200 ) self._wait_for_history( history_id, assert_ok=True ) + def test_extract_from_history( self ): + workflow = self.workflow_populator.load_workflow( name="test_for_extract" ) + workflow_request, history_id = self._setup_workflow_run( workflow ) + contents_response = self._get( "histories/%s/contents" % history_id ) + self._assert_status_code_is( contents_response, 200 ) + hda_ids = map( lambda c: c[ "id" ], contents_response.json() ) + + run_workflow_response = self._post( "workflows", data=workflow_request ) + self._assert_status_code_is( run_workflow_response, 200 ) + + self._wait_for_history( history_id, assert_ok=True ) + data = dict( history_id=history_id, tool_id="cat1" ) + jobs_response = self._get( "jobs", data=data ) + self._assert_status_code_is( jobs_response, 200 ) + cat1_job_id = jobs_response.json()[ 0 ][ "id" ] + + contents_response = self._get( "history/%s/contents", data=data ) + create_from_data = dict( + from_history_id=history_id, + dataset_ids=dumps( hda_ids ), + job_ids=dumps( [ cat1_job_id ] ), + workflow_name="test import from history", + ) + run_workflow_response = self._post( "workflows", data=create_from_data ) + self._assert_status_code_is( run_workflow_response, 200 ) + + new_workflow_id = run_workflow_response.json()[ "id" ] + download_response = self._get( "workflows/%s/download" % new_workflow_id ) + self._assert_status_code_is( download_response, 200 ) + downloaded_workflow = download_response.json() + self.assertEquals( downloaded_workflow[ "name" ], "test import from history" ) + assert len( downloaded_workflow[ "steps" ] ) == 3 + def test_run_replace_params_by_tool( self ): workflow_request, history_id = self._setup_random_x2_workflow( "test_for_replace_tool_params" ) workflow_request[ "parameters" ] = dumps( dict( random_lines1=dict( num_lines=5 ) ) ) https://bitbucket.org/galaxy/galaxy-central/commits/6d0c89fc3558/ Changeset: 6d0c89fc3558 User: jmchilton Date: 2014-03-26 22:21:15 Summary: Merged in jmchilton/galaxy-central-fork-1 (pull request #348) Enhance workflows API to allow extracting workflow from history. Affected #: 7 files diff -r 90b4baa5c2b18d49bcea5a0807f0df8924abebb1 -r 6d0c89fc3558d7185daaa1dbda05e6d15879ff00 lib/galaxy/web/base/controller.py --- a/lib/galaxy/web/base/controller.py +++ b/lib/galaxy/web/base/controller.py @@ -41,6 +41,7 @@ from galaxy.tools.parameters.basic import DataToolParameter from galaxy.util.json import to_json_string from galaxy.workflow.modules import ToolModule +from galaxy.workflow.build_util import attach_ordered_steps log = logging.getLogger( __name__ ) @@ -1676,7 +1677,6 @@ """ Creates a workflow from a dict. Created workflow is stored in the database and returned. """ - from galaxy.webapps.galaxy.controllers.workflow import attach_ordered_steps # Put parameters in workflow mode trans.workflow_building_mode = True diff -r 90b4baa5c2b18d49bcea5a0807f0df8924abebb1 -r 6d0c89fc3558d7185daaa1dbda05e6d15879ff00 lib/galaxy/webapps/galaxy/api/workflows.py --- a/lib/galaxy/webapps/galaxy/api/workflows.py +++ b/lib/galaxy/webapps/galaxy/api/workflows.py @@ -11,8 +11,10 @@ from galaxy import web from galaxy.web import _future_expose_api as expose_api from galaxy.web.base.controller import BaseAPIController, url_for, UsesStoredWorkflowMixin +from galaxy.web.base.controller import UsesHistoryMixin from galaxy.workflow.modules import module_factory from galaxy.workflow.run import invoke +from galaxy.workflow.extract import extract_workflow log = logging.getLogger(__name__) @@ -51,7 +53,7 @@ step.state.inputs.update(param_dict) -class WorkflowsAPIController(BaseAPIController, UsesStoredWorkflowMixin): +class WorkflowsAPIController(BaseAPIController, UsesStoredWorkflowMixin, UsesHistoryMixin): @web.expose_api def index(self, trans, **kwd): @@ -141,9 +143,11 @@ """ POST /api/workflows - We're not creating workflows from the api. Just execute for now. + Run or create workflows from the api. - However, we will import them if installed_repository_file is specified. + If installed_repository_file or from_history_id is specified a new + workflow will be created for this user. Otherwise, workflow_id must be + specified and this API method will cause a workflow to execute. :param installed_repository_file The path of a workflow to import. Either workflow_id or installed_repository_file must be specified :type installed_repository_file str @@ -165,6 +169,15 @@ :param replacement_params: A dictionary used when renaming datasets :type replacement_params: dict + + :param from_history_id: Id of history to extract a workflow from. Should not be used with worfklow_id or installed_repository_file. + :type from_history_id: str + + :param job_ids: If from_history_id is set - this should be a list of jobs to include when extracting workflow from history. + :type job_ids: str + + :param dataset_ids: If from_history_id is set - this should be a list of HDA ids corresponding to workflow inputs when extracting workflow from history. + :type dataset_ids: str """ # Pull parameters out of payload. @@ -183,6 +196,24 @@ cntrller='api', **payload) return result + if 'from_history_id' in payload: + from_history_id = payload.get( 'from_history_id' ) + history = self.get_history( trans, from_history_id, check_ownership=False, check_accessible=True ) + job_ids = map( trans.security.decode_id, payload.get( "job_ids", [] ) ) + dataset_ids = map( trans.security.decode_id, payload.get( "dataset_ids", [] ) ) + workflow_name = payload[ "workflow_name" ] + stored_workflow = extract_workflow( + trans=trans, + user=trans.get_user(), + history=history, + job_ids=job_ids, + dataset_ids=dataset_ids, + workflow_name=workflow_name, + ) + item = stored_workflow.to_dict( value_mapper={ "id": trans.security.encode_id } ) + item[ 'url' ] = url_for( 'workflow', id=item[ "id" ] ) + return item + trans.response.status = 403 return "Either workflow_id or installed_repository_file must be specified" if 'installed_repository_file' in payload: diff -r 90b4baa5c2b18d49bcea5a0807f0df8924abebb1 -r 6d0c89fc3558d7185daaa1dbda05e6d15879ff00 lib/galaxy/webapps/galaxy/controllers/workflow.py --- a/lib/galaxy/webapps/galaxy/controllers/workflow.py +++ b/lib/galaxy/webapps/galaxy/controllers/workflow.py @@ -4,7 +4,6 @@ import base64 import httplib import json -import math import os import sgmllib import svgfig @@ -20,12 +19,9 @@ from galaxy.datatypes.data import Data from galaxy.model.item_attrs import UsesItemRatings from galaxy.model.mapping import desc +from galaxy.tools.parameters.basic import DataToolParameter from galaxy.tools.parameters import visit_input_values -from galaxy.tools.parameters.basic import DataToolParameter, DrillDownSelectToolParameter, SelectToolParameter, UnvalidatedValue -from galaxy.tools.parameters.grouping import Conditional, Repeat -from galaxy.util.odict import odict from galaxy.util.sanitize_html import sanitize_html -from galaxy.util.topsort import CycleError, topsort, topsort_levels from galaxy.web import error, url_for from galaxy.web.base.controller import BaseUIController, SharableMixin, UsesStoredWorkflowMixin from galaxy.web.framework import form @@ -33,6 +29,14 @@ from galaxy.web.framework.helpers import to_unicode from galaxy.workflow.modules import module_factory from galaxy.workflow.run import invoke +from galaxy.workflow.extract import summarize +from galaxy.workflow.extract import extract_workflow +from galaxy.workflow.steps import ( + attach_ordered_steps, + order_workflow_steps, + edgelist_for_workflow_steps, + order_workflow_steps_with_levels, +) class StoredWorkflowListGrid( grids.Grid ): @@ -1188,7 +1192,7 @@ if not user: return trans.show_error_message( "Must be logged in to create workflows" ) if ( job_ids is None and dataset_ids is None ) or workflow_name is None: - jobs, warnings = get_job_dict( trans ) + jobs, warnings = summarize( trans ) # Render return trans.fill_template( "workflow/build_from_current_history.mako", @@ -1197,82 +1201,13 @@ history=history ) else: - # Ensure job_ids and dataset_ids are lists (possibly empty) - if job_ids is None: - job_ids = [] - elif type( job_ids ) is not list: - job_ids = [ job_ids ] - if dataset_ids is None: - dataset_ids = [] - elif type( dataset_ids ) is not list: - dataset_ids = [ dataset_ids ] - # Convert both sets of ids to integers - job_ids = [ int( id ) for id in job_ids ] - dataset_ids = [ int( id ) for id in dataset_ids ] - # Find each job, for security we (implicately) check that they are - # associated witha job in the current history. - jobs, warnings = get_job_dict( trans ) - jobs_by_id = dict( ( job.id, job ) for job in jobs.keys() ) - steps = [] - steps_by_job_id = {} - hid_to_output_pair = {} - # Input dataset steps - for hid in dataset_ids: - step = model.WorkflowStep() - step.type = 'data_input' - step.tool_inputs = dict( name="Input Dataset" ) - hid_to_output_pair[ hid ] = ( step, 'output' ) - steps.append( step ) - # Tool steps - for job_id in job_ids: - assert job_id in jobs_by_id, "Attempt to create workflow with job not connected to current history" - job = jobs_by_id[ job_id ] - tool = trans.app.toolbox.get_tool( job.tool_id ) - param_values = job.get_param_values( trans.app, ignore_errors=True ) # If a tool was updated and e.g. had a text value changed to an integer, we don't want a traceback here - associations = cleanup_param_values( tool.inputs, param_values ) - step = model.WorkflowStep() - step.type = 'tool' - step.tool_id = job.tool_id - step.tool_inputs = tool.params_to_strings( param_values, trans.app ) - # NOTE: We shouldn't need to do two passes here since only - # an earlier job can be used as an input to a later - # job. - for other_hid, input_name in associations: - if other_hid in hid_to_output_pair: - other_step, other_name = hid_to_output_pair[ other_hid ] - conn = model.WorkflowStepConnection() - conn.input_step = step - conn.input_name = input_name - # Should always be connected to an earlier step - conn.output_step = other_step - conn.output_name = other_name - steps.append( step ) - steps_by_job_id[ job_id ] = step - # Store created dataset hids - for assoc in job.output_datasets: - hid_to_output_pair[ assoc.dataset.hid ] = ( step, assoc.name ) - # Workflow to populate - workflow = model.Workflow() - workflow.name = workflow_name - # Order the steps if possible - attach_ordered_steps( workflow, steps ) - # And let's try to set up some reasonable locations on the canvas - # (these are pretty arbitrary values) - levorder = order_workflow_steps_with_levels( steps ) - base_pos = 10 - for i, steps_at_level in enumerate( levorder ): - for j, index in enumerate( steps_at_level ): - step = steps[ index ] - step.position = dict( top=( base_pos + 120 * j ), - left=( base_pos + 220 * i ) ) - # Store it - stored = model.StoredWorkflow() - stored.user = user - stored.name = workflow_name - workflow.stored_workflow = stored - stored.latest_workflow = workflow - trans.sa_session.add( stored ) - trans.sa_session.flush() + extract_workflow( + trans, + user=user, + job_ids=job_ids, + dataset_ids=dataset_ids, + workflow_name=workflow_name + ) # Index page with message return trans.show_message( "Workflow '%s' created from current history." % workflow_name ) ## return trans.show_ok_message( "<p>Workflow '%s' created.</p><p><a target='_top' href='%s'>Click to load in workflow editor</a></p>" @@ -1692,148 +1627,6 @@ ## ---- Utility methods ------------------------------------------------------- -def attach_ordered_steps( workflow, steps ): - ordered_steps = order_workflow_steps( steps ) - if ordered_steps: - workflow.has_cycles = False - for i, step in enumerate( ordered_steps ): - step.order_index = i - workflow.steps.append( step ) - else: - workflow.has_cycles = True - workflow.steps = steps - - -def edgelist_for_workflow_steps( steps ): - """ - Create a list of tuples representing edges between ``WorkflowSteps`` based - on associated ``WorkflowStepConnection``s - """ - edges = [] - steps_to_index = dict( ( step, i ) for i, step in enumerate( steps ) ) - for step in steps: - edges.append( ( steps_to_index[step], steps_to_index[step] ) ) - for conn in step.input_connections: - edges.append( ( steps_to_index[conn.output_step], steps_to_index[conn.input_step] ) ) - return edges - - -def order_workflow_steps( steps ): - """ - Perform topological sort of the steps, return ordered or None - """ - position_data_available = True - for step in steps: - if not step.position or not 'left' in step.position or not 'top' in step.position: - position_data_available = False - if position_data_available: - steps.sort(cmp=lambda s1, s2: cmp( math.sqrt(s1.position['left'] ** 2 + s1.position['top'] ** 2), math.sqrt(s2.position['left'] ** 2 + s2.position['top'] ** 2))) - try: - edges = edgelist_for_workflow_steps( steps ) - node_order = topsort( edges ) - return [ steps[i] for i in node_order ] - except CycleError: - return None - - -def order_workflow_steps_with_levels( steps ): - try: - return topsort_levels( edgelist_for_workflow_steps( steps ) ) - except CycleError: - return None - - -class FakeJob( object ): - """ - Fake job object for datasets that have no creating_job_associations, - they will be treated as "input" datasets. - """ - def __init__( self, dataset ): - self.is_fake = True - self.id = "fake_%s" % dataset.id - - -def get_job_dict( trans ): - """ - Return a dictionary of Job -> [ Dataset ] mappings, for all finished - active Datasets in the current history and the jobs that created them. - """ - history = trans.get_history() - # Get the jobs that created the datasets - warnings = set() - jobs = odict() - for dataset in history.active_datasets: - # FIXME: Create "Dataset.is_finished" - if dataset.state in ( 'new', 'running', 'queued' ): - warnings.add( "Some datasets still queued or running were ignored" ) - continue - - #if this hda was copied from another, we need to find the job that created the origial hda - job_hda = dataset - while job_hda.copied_from_history_dataset_association: - job_hda = job_hda.copied_from_history_dataset_association - - if not job_hda.creating_job_associations: - jobs[ FakeJob( dataset ) ] = [ ( None, dataset ) ] - - for assoc in job_hda.creating_job_associations: - job = assoc.job - if job in jobs: - jobs[ job ].append( ( assoc.name, dataset ) ) - else: - jobs[ job ] = [ ( assoc.name, dataset ) ] - return jobs, warnings - - -def cleanup_param_values( inputs, values ): - """ - Remove 'Data' values from `param_values`, along with metadata cruft, - but track the associations. - """ - associations = [] - # dbkey is pushed in by the framework - if 'dbkey' in values: - del values['dbkey'] - root_values = values - - # Recursively clean data inputs and dynamic selects - def cleanup( prefix, inputs, values ): - for key, input in inputs.items(): - if isinstance( input, ( SelectToolParameter, DrillDownSelectToolParameter ) ): - if input.is_dynamic and not isinstance( values[key], UnvalidatedValue ): - values[key] = UnvalidatedValue( values[key] ) - if isinstance( input, DataToolParameter ): - tmp = values[key] - values[key] = None - # HACK: Nested associations are not yet working, but we - # still need to clean them up so we can serialize - # if not( prefix ): - if tmp: # this is false for a non-set optional dataset - if not isinstance(tmp, list): - associations.append( ( tmp.hid, prefix + key ) ) - else: - associations.extend( [ (t.hid, prefix + key) for t in tmp] ) - - # Cleanup the other deprecated crap associated with datasets - # as well. Worse, for nested datasets all the metadata is - # being pushed into the root. FIXME: MUST REMOVE SOON - key = prefix + key + "_" - for k in root_values.keys(): - if k.startswith( key ): - del root_values[k] - elif isinstance( input, Repeat ): - group_values = values[key] - for i, rep_values in enumerate( group_values ): - rep_index = rep_values['__index__'] - cleanup( "%s%s_%d|" % (prefix, key, rep_index ), input.inputs, group_values[i] ) - elif isinstance( input, Conditional ): - group_values = values[input.name] - current_case = group_values['__current_case__'] - cleanup( "%s%s|" % ( prefix, key ), input.cases[current_case].inputs, group_values ) - cleanup( "", inputs, values ) - return associations - - def _build_workflow_on_str(instance_ds_names): # Returns suffix for new histories based on multi input iteration num_multi_inputs = len(instance_ds_names) diff -r 90b4baa5c2b18d49bcea5a0807f0df8924abebb1 -r 6d0c89fc3558d7185daaa1dbda05e6d15879ff00 lib/galaxy/workflow/extract.py --- /dev/null +++ b/lib/galaxy/workflow/extract.py @@ -0,0 +1,209 @@ +""" This module contains functionality to aid in extracting workflows from +histories. +""" +from galaxy.util.odict import odict +from galaxy import model +from galaxy.tools.parameters.basic import ( + DataToolParameter, + DrillDownSelectToolParameter, + SelectToolParameter, + UnvalidatedValue +) +from galaxy.tools.parameters.grouping import ( + Conditional, + Repeat +) +from .steps import ( + attach_ordered_steps, + order_workflow_steps_with_levels +) + +WARNING_SOME_DATASETS_NOT_READY = "Some datasets still queued or running were ignored" + + +def extract_workflow( trans, user, history=None, job_ids=None, dataset_ids=None, workflow_name=None ): + steps = extract_steps( trans, history=history, job_ids=job_ids, dataset_ids=dataset_ids ) + # Workflow to populate + workflow = model.Workflow() + workflow.name = workflow_name + # Order the steps if possible + attach_ordered_steps( workflow, steps ) + # And let's try to set up some reasonable locations on the canvas + # (these are pretty arbitrary values) + levorder = order_workflow_steps_with_levels( steps ) + base_pos = 10 + for i, steps_at_level in enumerate( levorder ): + for j, index in enumerate( steps_at_level ): + step = steps[ index ] + step.position = dict( top=( base_pos + 120 * j ), + left=( base_pos + 220 * i ) ) + # Store it + stored = model.StoredWorkflow() + stored.user = user + stored.name = workflow_name + workflow.stored_workflow = stored + stored.latest_workflow = workflow + trans.sa_session.add( stored ) + trans.sa_session.flush() + return stored + + +def extract_steps( trans, history=None, job_ids=None, dataset_ids=None ): + # Ensure job_ids and dataset_ids are lists (possibly empty) + if job_ids is None: + job_ids = [] + elif type( job_ids ) is not list: + job_ids = [ job_ids ] + if dataset_ids is None: + dataset_ids = [] + elif type( dataset_ids ) is not list: + dataset_ids = [ dataset_ids ] + # Convert both sets of ids to integers + job_ids = [ int( id ) for id in job_ids ] + dataset_ids = [ int( id ) for id in dataset_ids ] + # Find each job, for security we (implicately) check that they are + # associated witha job in the current history. + jobs, warnings = summarize( trans, history=history ) + jobs_by_id = dict( ( job.id, job ) for job in jobs.keys() ) + steps = [] + steps_by_job_id = {} + hid_to_output_pair = {} + # Input dataset steps + for hid in dataset_ids: + step = model.WorkflowStep() + step.type = 'data_input' + step.tool_inputs = dict( name="Input Dataset" ) + hid_to_output_pair[ hid ] = ( step, 'output' ) + steps.append( step ) + # Tool steps + for job_id in job_ids: + assert job_id in jobs_by_id, "Attempt to create workflow with job not connected to current history" + job = jobs_by_id[ job_id ] + tool_inputs, associations = step_inputs( trans, job ) + step = model.WorkflowStep() + step.type = 'tool' + step.tool_id = job.tool_id + step.tool_inputs = tool_inputs + # NOTE: We shouldn't need to do two passes here since only + # an earlier job can be used as an input to a later + # job. + for other_hid, input_name in associations: + if other_hid in hid_to_output_pair: + other_step, other_name = hid_to_output_pair[ other_hid ] + conn = model.WorkflowStepConnection() + conn.input_step = step + conn.input_name = input_name + # Should always be connected to an earlier step + conn.output_step = other_step + conn.output_name = other_name + steps.append( step ) + steps_by_job_id[ job_id ] = step + # Store created dataset hids + for assoc in job.output_datasets: + hid_to_output_pair[ assoc.dataset.hid ] = ( step, assoc.name ) + return steps + + +class FakeJob( object ): + """ + Fake job object for datasets that have no creating_job_associations, + they will be treated as "input" datasets. + """ + def __init__( self, dataset ): + self.is_fake = True + self.id = "fake_%s" % dataset.id + + +def summarize( trans, history=None ): + """ Return mapping of job description to datasets for active items in + supplied history - needed for building workflow from a history. + + Formerly call get_job_dict in workflow web controller. + """ + if not history: + history = trans.get_history() + + # Get the jobs that created the datasets + warnings = set() + jobs = odict() + for dataset in history.active_datasets: + # FIXME: Create "Dataset.is_finished" + if dataset.state in ( 'new', 'running', 'queued' ): + warnings.add( WARNING_SOME_DATASETS_NOT_READY ) + continue + + #if this hda was copied from another, we need to find the job that created the origial hda + job_hda = dataset + while job_hda.copied_from_history_dataset_association: + job_hda = job_hda.copied_from_history_dataset_association + + if not job_hda.creating_job_associations: + jobs[ FakeJob( dataset ) ] = [ ( None, dataset ) ] + + for assoc in job_hda.creating_job_associations: + job = assoc.job + if job in jobs: + jobs[ job ].append( ( assoc.name, dataset ) ) + else: + jobs[ job ] = [ ( assoc.name, dataset ) ] + + return jobs, warnings + + +def step_inputs( trans, job ): + tool = trans.app.toolbox.get_tool( job.tool_id ) + param_values = job.get_param_values( trans.app, ignore_errors=True ) # If a tool was updated and e.g. had a text value changed to an integer, we don't want a traceback here + associations = __cleanup_param_values( tool.inputs, param_values ) + tool_inputs = tool.params_to_strings( param_values, trans.app ) + return tool_inputs, associations + + +def __cleanup_param_values( inputs, values ): + """ + Remove 'Data' values from `param_values`, along with metadata cruft, + but track the associations. + """ + associations = [] + # dbkey is pushed in by the framework + if 'dbkey' in values: + del values['dbkey'] + root_values = values + + # Recursively clean data inputs and dynamic selects + def cleanup( prefix, inputs, values ): + for key, input in inputs.items(): + if isinstance( input, ( SelectToolParameter, DrillDownSelectToolParameter ) ): + if input.is_dynamic and not isinstance( values[key], UnvalidatedValue ): + values[key] = UnvalidatedValue( values[key] ) + if isinstance( input, DataToolParameter ): + tmp = values[key] + values[key] = None + # HACK: Nested associations are not yet working, but we + # still need to clean them up so we can serialize + # if not( prefix ): + if tmp: # this is false for a non-set optional dataset + if not isinstance(tmp, list): + associations.append( ( tmp.hid, prefix + key ) ) + else: + associations.extend( [ (t.hid, prefix + key) for t in tmp] ) + + # Cleanup the other deprecated crap associated with datasets + # as well. Worse, for nested datasets all the metadata is + # being pushed into the root. FIXME: MUST REMOVE SOON + key = prefix + key + "_" + for k in root_values.keys(): + if k.startswith( key ): + del root_values[k] + elif isinstance( input, Repeat ): + group_values = values[key] + for i, rep_values in enumerate( group_values ): + rep_index = rep_values['__index__'] + cleanup( "%s%s_%d|" % (prefix, key, rep_index ), input.inputs, group_values[i] ) + elif isinstance( input, Conditional ): + group_values = values[input.name] + current_case = group_values['__current_case__'] + cleanup( "%s%s|" % ( prefix, key ), input.cases[current_case].inputs, group_values ) + cleanup( "", inputs, values ) + return associations + +__all__ = [ summarize, extract_workflow ] diff -r 90b4baa5c2b18d49bcea5a0807f0df8924abebb1 -r 6d0c89fc3558d7185daaa1dbda05e6d15879ff00 lib/galaxy/workflow/steps.py --- /dev/null +++ b/lib/galaxy/workflow/steps.py @@ -0,0 +1,63 @@ +""" This module contains utility methods for reasoning about and ordering +workflow steps. +""" +import math +from galaxy.util.topsort import ( + CycleError, + topsort, + topsort_levels +) + + +def attach_ordered_steps( workflow, steps ): + """ Attempt to topologically order steps and attach to workflow. If this + fails - the workflow contains cycles so it mark it as such. + """ + ordered_steps = order_workflow_steps( steps ) + if ordered_steps: + workflow.has_cycles = False + for i, step in enumerate( ordered_steps ): + step.order_index = i + workflow.steps.append( step ) + else: + workflow.has_cycles = True + workflow.steps = steps + + +def order_workflow_steps( steps ): + """ + Perform topological sort of the steps, return ordered or None + """ + position_data_available = True + for step in steps: + if not step.position or not 'left' in step.position or not 'top' in step.position: + position_data_available = False + if position_data_available: + steps.sort(cmp=lambda s1, s2: cmp( math.sqrt(s1.position['left'] ** 2 + s1.position['top'] ** 2), math.sqrt(s2.position['left'] ** 2 + s2.position['top'] ** 2))) + try: + edges = edgelist_for_workflow_steps( steps ) + node_order = topsort( edges ) + return [ steps[i] for i in node_order ] + except CycleError: + return None + + +def edgelist_for_workflow_steps( steps ): + """ + Create a list of tuples representing edges between ``WorkflowSteps`` based + on associated ``WorkflowStepConnection``s + """ + edges = [] + steps_to_index = dict( ( step, i ) for i, step in enumerate( steps ) ) + for step in steps: + edges.append( ( steps_to_index[step], steps_to_index[step] ) ) + for conn in step.input_connections: + edges.append( ( steps_to_index[conn.output_step], steps_to_index[conn.input_step] ) ) + return edges + + +def order_workflow_steps_with_levels( steps ): + try: + return topsort_levels( edgelist_for_workflow_steps( steps ) ) + except CycleError: + return None diff -r 90b4baa5c2b18d49bcea5a0807f0df8924abebb1 -r 6d0c89fc3558d7185daaa1dbda05e6d15879ff00 test/functional/api/test_workflows.py --- a/test/functional/api/test_workflows.py +++ b/test/functional/api/test_workflows.py @@ -67,6 +67,39 @@ self._assert_status_code_is( run_workflow_response, 200 ) self._wait_for_history( history_id, assert_ok=True ) + def test_extract_from_history( self ): + workflow = self.workflow_populator.load_workflow( name="test_for_extract" ) + workflow_request, history_id = self._setup_workflow_run( workflow ) + contents_response = self._get( "histories/%s/contents" % history_id ) + self._assert_status_code_is( contents_response, 200 ) + hda_ids = map( lambda c: c[ "id" ], contents_response.json() ) + + run_workflow_response = self._post( "workflows", data=workflow_request ) + self._assert_status_code_is( run_workflow_response, 200 ) + + self._wait_for_history( history_id, assert_ok=True ) + data = dict( history_id=history_id, tool_id="cat1" ) + jobs_response = self._get( "jobs", data=data ) + self._assert_status_code_is( jobs_response, 200 ) + cat1_job_id = jobs_response.json()[ 0 ][ "id" ] + + contents_response = self._get( "history/%s/contents", data=data ) + create_from_data = dict( + from_history_id=history_id, + dataset_ids=dumps( hda_ids ), + job_ids=dumps( [ cat1_job_id ] ), + workflow_name="test import from history", + ) + run_workflow_response = self._post( "workflows", data=create_from_data ) + self._assert_status_code_is( run_workflow_response, 200 ) + + new_workflow_id = run_workflow_response.json()[ "id" ] + download_response = self._get( "workflows/%s/download" % new_workflow_id ) + self._assert_status_code_is( download_response, 200 ) + downloaded_workflow = download_response.json() + self.assertEquals( downloaded_workflow[ "name" ], "test import from history" ) + assert len( downloaded_workflow[ "steps" ] ) == 3 + def test_run_replace_params_by_tool( self ): workflow_request, history_id = self._setup_random_x2_workflow( "test_for_replace_tool_params" ) workflow_request[ "parameters" ] = dumps( dict( random_lines1=dict( num_lines=5 ) ) ) diff -r 90b4baa5c2b18d49bcea5a0807f0df8924abebb1 -r 6d0c89fc3558d7185daaa1dbda05e6d15879ff00 test/unit/workflows/test_extract_summary.py --- /dev/null +++ b/test/unit/workflows/test_extract_summary.py @@ -0,0 +1,98 @@ +import unittest + +from galaxy import model +from galaxy.workflow import extract + +UNDEFINED_JOB = object() + + +class TestWorkflowExtractSummary( unittest.TestCase ): + + def setUp( self ): + self.history = MockHistory() + self.trans = MockTrans( self.history ) + + def test_empty_history( self ): + job_dict, warnings = extract.summarize( trans=self.trans ) + assert not warnings + assert not job_dict + + def test_summarize_returns_name_and_dataset_list( self ): + # Create two jobs and three datasets, test they are groupped + # by job correctly with correct output names. + hda1 = MockHda() + self.history.active_datasets.append( hda1 ) + hda2 = MockHda( job=hda1.job, output_name="out2" ) + self.history.active_datasets.append( hda2 ) + hda3 = MockHda( output_name="out3" ) + self.history.active_datasets.append( hda3 ) + + job_dict, warnings = extract.summarize( trans=self.trans ) + assert len( job_dict ) == 2 + assert not warnings + self.assertEquals( job_dict[ hda1.job ], [ ( 'out1', hda1 ), ( 'out2', hda2 ) ] ) + self.assertEquals( job_dict[ hda3.job ], [ ( 'out3', hda3 ) ] ) + + def test_finds_original_job_if_copied( self ): + hda = MockHda() + derived_hda_1 = MockHda() + derived_hda_1.copied_from_history_dataset_association = hda + derived_hda_2 = MockHda() + derived_hda_2.copied_from_history_dataset_association = derived_hda_1 + self.history.active_datasets.append( derived_hda_2 ) + job_dict, warnings = extract.summarize( trans=self.trans ) + assert not warnings + assert len( job_dict ) == 1 + self.assertEquals( job_dict[ hda.job ], [ ('out1', derived_hda_2 ) ] ) + + def test_fake_job( self ): + """ Fakes job if creating_job_associations is empty. + """ + hda = MockHda( job=UNDEFINED_JOB ) + self.history.active_datasets.append( hda ) + job_dict, warnings = extract.summarize( trans=self.trans ) + assert not warnings + assert len( job_dict ) == 1 + fake_job = job_dict.keys()[ 0 ] + assert fake_job.id.startswith( "fake_" ) + datasets = job_dict.values()[ 0 ] + assert datasets == [ ( None, hda ) ] + + def test_warns_and_skips_datasets_if_not_finished( self ): + hda = MockHda( state='queued' ) + self.history.active_datasets.append( hda ) + job_dict, warnings = extract.summarize( trans=self.trans ) + assert warnings + assert len( job_dict ) == 0 + + +class MockHistory( object ): + + def __init__( self ): + self.active_datasets = [] + + +class MockTrans( object ): + + def __init__( self, history ): + self.history = history + + def get_history( self ): + return self.history + + +class MockHda( object ): + + def __init__( self, state='ok', output_name='out1', job=None ): + self.id = 123 + self.state = state + self.copied_from_history_dataset_association = None + if job is not UNDEFINED_JOB: + if not job: + job = model.Job() + self.job = job + assoc = model.JobToOutputDatasetAssociation( output_name, self ) + assoc.job = job + self.creating_job_associations = [ assoc ] + else: + self.creating_job_associations = [] Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.