details: http://www.bx.psu.edu/hg/galaxy/rev/2f3b76827743 changeset: 3456:2f3b76827743 user: James Taylor <james@jamestaylor.org> date: Thu Feb 11 00:17:04 2010 -0500 description: [mq] workflow-invocation: Track workflow invocations in database, provide a view of the history grouped by workflow and job diffstat: lib/galaxy/model/__init__.py | 6 + lib/galaxy/model/mapping.py | 68 +++++++--- lib/galaxy/model/migrate/versions/0041_workflow_invocation.py | 52 ++++++++ lib/galaxy/tools/__init__.py | 2 +- lib/galaxy/tools/actions/__init__.py | 6 +- lib/galaxy/tools/actions/metadata.py | 2 +- lib/galaxy/tools/actions/upload.py | 2 +- lib/galaxy/tools/actions/upload_common.py | 7 +- lib/galaxy/web/controllers/history.py | 67 ++++++++++ lib/galaxy/web/controllers/workflow.py | 19 ++- lib/galaxy/workflow/modules.py | 2 +- scripts/manage_db.py | 1 + templates/root/index.mako | 3 + 13 files changed, 204 insertions(+), 33 deletions(-) diffs (426 lines): diff -r c48de7f12e58 -r 2f3b76827743 lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py Mon Mar 01 17:47:28 2010 -0500 +++ b/lib/galaxy/model/__init__.py Thu Feb 11 00:17:04 2010 -0500 @@ -1217,6 +1217,12 @@ self.user = None self.order_index = None +class WorkflowInvocation( object ): + pass + +class WorkflowInvocationStep( object ): + pass + class MetadataFile( object ): def __init__( self, dataset = None, name = None ): if isinstance( dataset, HistoryDatasetAssociation ): diff -r c48de7f12e58 -r 2f3b76827743 lib/galaxy/model/mapping.py --- a/lib/galaxy/model/mapping.py Mon Mar 01 17:47:28 2010 -0500 +++ b/lib/galaxy/model/mapping.py Thu Feb 11 00:17:04 2010 -0500 @@ -558,6 +558,22 @@ Column( "input_name", TEXT) ) +WorkflowInvocation.table = Table( "workflow_invocation", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "update_time", DateTime, default=now, onupdate=now ), + Column( "workflow_id", Integer, ForeignKey( "workflow.id" ), index=True, nullable=False ) + ) + +WorkflowInvocationStep.table = Table( "workflow_invocation_step", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "update_time", DateTime, default=now, onupdate=now ), + Column( "workflow_invocation_id", Integer, ForeignKey( "workflow_invocation.id" ), index=True, nullable=False ), + Column( "workflow_step_id", Integer, ForeignKey( "workflow_step.id" ), index=True, nullable=False ), + Column( "job_id", Integer, ForeignKey( "job.id" ), index=True, nullable=False ) + ) + StoredWorkflowUserShareAssociation.table = Table( "stored_workflow_user_share_connection", metadata, Column( "id", Integer, primary_key=True ), Column( "stored_workflow_id", Integer, ForeignKey( "stored_workflow.id" ), index=True ), @@ -1215,27 +1231,6 @@ HistoryDatasetAssociation.mapper.add_property( "creating_job_associations", relation( JobToOutputDatasetAssociation ) ) -assign_mapper( context, Workflow, Workflow.table, - properties=dict( steps=relation( WorkflowStep, backref='workflow', - order_by=asc(WorkflowStep.table.c.order_index), - cascade="all, delete-orphan", - lazy=False ), - tags=relation(WorkflowTagAssociation, order_by=WorkflowTagAssociation.table.c.id, backref="workflows") - ) ) - - -assign_mapper( context, WorkflowStep, WorkflowStep.table, - properties=dict( - tags=relation(WorkflowStepTagAssociation, order_by=WorkflowStepTagAssociation.table.c.id, backref="workflow_steps"), - annotations=relation( WorkflowStepAnnotationAssociation, order_by=WorkflowStepAnnotationAssociation.table.c.id, backref="workflow_steps" ) ) - ) - -assign_mapper( context, WorkflowStepConnection, WorkflowStepConnection.table, - properties=dict( input_step=relation( WorkflowStep, backref="input_connections", cascade="all", - primaryjoin=( WorkflowStepConnection.table.c.input_step_id == WorkflowStep.table.c.id ) ), - output_step=relation( WorkflowStep, backref="output_connections", cascade="all", - primaryjoin=( WorkflowStepConnection.table.c.output_step_id == WorkflowStep.table.c.id ) ) ) ) - # vvvvvvvvvvvvvvvv Start cloud table mappings vvvvvvvvvvvvvvvv assign_mapper( context, CloudImage, CloudImage.table ) @@ -1272,6 +1267,27 @@ ) ) # ^^^^^^^^^^^^^^^ End cloud table mappings ^^^^^^^^^^^^^^^^^^ +assign_mapper( context, Workflow, Workflow.table, + properties=dict( steps=relation( WorkflowStep, backref='workflow', + order_by=asc(WorkflowStep.table.c.order_index), + cascade="all, delete-orphan", + lazy=False ), + tags=relation(WorkflowTagAssociation, order_by=WorkflowTagAssociation.table.c.id, backref="workflows") + ) ) + +assign_mapper( context, WorkflowStep, WorkflowStep.table, + properties=dict( + tags=relation(WorkflowStepTagAssociation, order_by=WorkflowStepTagAssociation.table.c.id, backref="workflow_steps"), + annotations=relation( WorkflowStepAnnotationAssociation, order_by=WorkflowStepAnnotationAssociation.table.c.id, backref="workflow_steps" ) ) + ) + +assign_mapper( context, WorkflowStepConnection, WorkflowStepConnection.table, + properties=dict( input_step=relation( WorkflowStep, backref="input_connections", cascade="all", + primaryjoin=( WorkflowStepConnection.table.c.input_step_id == WorkflowStep.table.c.id ) ), + output_step=relation( WorkflowStep, backref="output_connections", cascade="all", + primaryjoin=( WorkflowStepConnection.table.c.output_step_id == WorkflowStep.table.c.id ) ) ) ) + + assign_mapper( context, StoredWorkflow, StoredWorkflow.table, properties=dict( user=relation( User ), workflows=relation( Workflow, backref='stored_workflow', @@ -1297,6 +1313,16 @@ assign_mapper( context, StoredWorkflowMenuEntry, StoredWorkflowMenuEntry.table, properties=dict( stored_workflow=relation( StoredWorkflow ) ) ) +assign_mapper( context, WorkflowInvocation, WorkflowInvocation.table, + properties=dict( + steps=relation( WorkflowInvocationStep, backref='workflow_invocation', lazy=False ), + workflow=relation( Workflow ) ) ) + +assign_mapper( context, WorkflowInvocationStep, WorkflowInvocationStep.table, + properties=dict( + workflow_step = relation( WorkflowStep ), + job = relation( Job, backref=backref( 'workflow_invocation_step', uselist=False ) ) ) ) + assign_mapper( context, MetadataFile, MetadataFile.table, properties=dict( history_dataset=relation( HistoryDatasetAssociation ), library_dataset=relation( LibraryDatasetDatasetAssociation ) ) ) diff -r c48de7f12e58 -r 2f3b76827743 lib/galaxy/model/migrate/versions/0041_workflow_invocation.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/galaxy/model/migrate/versions/0041_workflow_invocation.py Thu Feb 11 00:17:04 2010 -0500 @@ -0,0 +1,52 @@ +""" +Migration script to create tables for tracking workflow invocations. +""" + +from sqlalchemy import * +from sqlalchemy.orm import * +from migrate import * +from migrate.changeset import * + +import logging +logging.basicConfig( level=logging.DEBUG ) +log = logging.getLogger( __name__ ) + +import datetime +now = datetime.datetime.utcnow + +metadata = MetaData( migrate_engine ) +db_session = scoped_session( sessionmaker( bind=migrate_engine, autoflush=False, autocommit=True ) ) + +WorkflowInvocation_table = Table( "workflow_invocation", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "update_time", DateTime, default=now, onupdate=now ), + Column( "workflow_id", Integer, ForeignKey( "workflow.id" ), index=True, nullable=False ) + ) + +WorkflowInvocationStep_table = Table( "workflow_invocation_step", metadata, + Column( "id", Integer, primary_key=True ), + Column( "create_time", DateTime, default=now ), + Column( "update_time", DateTime, default=now, onupdate=now ), + Column( "workflow_invocation_id", Integer, ForeignKey( "workflow_invocation.id" ), index=True, nullable=False ), + Column( "workflow_step_id", Integer, ForeignKey( "workflow_step.id" ), index=True, nullable=False ), + Column( "job_id", Integer, ForeignKey( "job.id" ), index=True, nullable=False ) + ) + +tables = [ WorkflowInvocation_table, WorkflowInvocationStep_table ] + +def upgrade(): + print __doc__ + metadata.reflect() + + for table in tables: + try: + table.create() + except: + log.warn( "Failed to create table '%s', ignoring (might result in wrong schema)" % table.name ) + +def downgrade(): + metadata.reflect() + + for table in tables: + table.drop() \ No newline at end of file diff -r c48de7f12e58 -r 2f3b76827743 lib/galaxy/tools/__init__.py --- a/lib/galaxy/tools/__init__.py Mon Mar 01 17:47:28 2010 -0500 +++ b/lib/galaxy/tools/__init__.py Thu Feb 11 00:17:04 2010 -0500 @@ -789,7 +789,7 @@ return "tool_form.mako", dict( errors=errors, tool_state=state, incoming=incoming, error_message=error_message ) # If we've completed the last page we can execute the tool elif state.page == self.last_page: - out_data = self.execute( trans, incoming=params ) + _, out_data = self.execute( trans, incoming=params ) try: assert isinstance( out_data, odict ) return 'tool_executed.mako', dict( out_data=out_data ) diff -r c48de7f12e58 -r 2f3b76827743 lib/galaxy/tools/actions/__init__.py --- a/lib/galaxy/tools/actions/__init__.py Mon Mar 01 17:47:28 2010 -0500 +++ b/lib/galaxy/tools/actions/__init__.py Thu Feb 11 00:17:04 2010 -0500 @@ -17,7 +17,7 @@ The actions to be taken when a tool is run (after parameters have been converted and validated). """ - def execute( self, tool, trans, incoming={} ): + def execute( self, tool, trans, incoming={}, set_output_hid=True ): raise TypeError("Abstract method") class DefaultToolAction( object ): @@ -101,7 +101,7 @@ tool.visit_inputs( param_values, visitor ) return input_datasets - def execute(self, tool, trans, incoming={}, set_output_hid=True ): + def execute(self, tool, trans, incoming={}, return_job=False, set_output_hid=True ): def make_dict_copy( from_dict ): """ Makes a copy of input dictionary from_dict such that all values that are dictionaries @@ -332,4 +332,4 @@ # Queue the job for execution trans.app.job_queue.put( job.id, tool ) trans.log_event( "Added job to the job queue, id: %s" % str(job.id), tool_id=job.tool_id ) - return out_data + return job, out_data diff -r c48de7f12e58 -r 2f3b76827743 lib/galaxy/tools/actions/metadata.py --- a/lib/galaxy/tools/actions/metadata.py Mon Mar 01 17:47:28 2010 -0500 +++ b/lib/galaxy/tools/actions/metadata.py Thu Feb 11 00:17:04 2010 -0500 @@ -60,4 +60,4 @@ # Queue the job for execution trans.app.job_queue.put( job.id, tool ) trans.log_event( "Added set external metadata job to the job queue, id: %s" % str(job.id), tool_id=job.tool_id ) - return odict() + return job, odict() diff -r c48de7f12e58 -r 2f3b76827743 lib/galaxy/tools/actions/upload.py --- a/lib/galaxy/tools/actions/upload.py Mon Mar 01 17:47:28 2010 -0500 +++ b/lib/galaxy/tools/actions/upload.py Thu Feb 11 00:17:04 2010 -0500 @@ -23,4 +23,4 @@ json_file_path = upload_common.create_paramfile( trans, uploaded_datasets ) data_list = [ ud.data for ud in uploaded_datasets ] - return upload_common.create_job( trans, incoming, tool, json_file_path, data_list ) + return upload_common.create_job( trans, incoming, tool, json_file_path, data_list, return_job=True ) diff -r c48de7f12e58 -r 2f3b76827743 lib/galaxy/tools/actions/upload_common.py --- a/lib/galaxy/tools/actions/upload_common.py Mon Mar 01 17:47:28 2010 -0500 +++ b/lib/galaxy/tools/actions/upload_common.py Thu Feb 11 00:17:04 2010 -0500 @@ -292,7 +292,7 @@ json_file.write( to_json_string( json ) + '\n' ) json_file.close() return json_file_path -def create_job( trans, params, tool, json_file_path, data_list, folder=None ): +def create_job( trans, params, tool, json_file_path, data_list, folder=None, return_job=False ): """ Create the upload job. """ @@ -329,7 +329,10 @@ output = odict() for i, v in enumerate( data_list ): output[ 'output%i' % i ] = v - return output + if return_job: + return job, output + else: + return output def active_folders( trans, folder ): # Stolen from galaxy.web.controllers.library_common (importing from which causes a circular issues). # Much faster way of retrieving all active sub-folders within a given folder than the diff -r c48de7f12e58 -r 2f3b76827743 lib/galaxy/web/controllers/history.py --- a/lib/galaxy/web/controllers/history.py Mon Mar 01 17:47:28 2010 -0500 +++ b/lib/galaxy/web/controllers/history.py Thu Feb 11 00:17:04 2010 -0500 @@ -295,6 +295,7 @@ trans.set_history( new_history ) # No message return None, None + @web.expose @web.require_login( "work with shared histories" ) def list_shared( self, trans, **kwargs ): @@ -329,6 +330,70 @@ status = 'done' # Render the list view return self.shared_list_grid( trans, status=status, message=message, **kwargs ) + + @web.expose + def display_structured( self, trans, id=None ): + """ + Display a history as a nested structure showing the jobs and workflow + invocations that created each dataset (if any). + """ + # Get history + if id is None: + history = trans.history + else: + id = trans.security.decode_id( id ) + history = trans.sa_session.query( model.History ).get( id ) + assert history + assert history.user and ( history.user == trans.user ) or ( history == trans.history ) + # Resolve jobs and workflow invocations for the datasets in the history + # items is filled with items (hdas, jobs, or workflows) that go at the + # top level + items = [] + # First go through and group hdas by job, if there is no job they get + # added directly to items + jobs = dict() + for hda in history.active_datasets: + # Follow "copied from ..." association until we get to the original + # instance of the dataset + original_hda = hda + while original_hda.copied_from_history_dataset_association: + original_hda = original_hda.copied_from_history_dataset_association + # Check if the job has a creating job, most should, datasets from + # before jobs were tracked, or from the upload tool before it + # created a job, may not + if not original_hda.creating_job_associations: + items.append( ( hda, None ) ) + # Attach hda to correct job + # -- there should only be one creating_job_association, so this + # loop body should only be hit once + for assoc in original_hda.creating_job_associations: + job = assoc.job + if job in jobs: + jobs[ job ].append( ( hda, None ) ) + else: + jobs[ job ] = [ ( hda, None ) ] + # Second, go through the jobs and connect to workflows + wf_invocations = dict() + for job, hdas in jobs.iteritems(): + # Job is attached to a workflow step, follow it to the + # workflow_invocation and group + if job.workflow_invocation_step: + wf_invocation = job.workflow_invocation_step.workflow_invocation + if wf_invocation in wf_invocations: + wf_invocations[ wf_invocation ].append( ( job, hdas ) ) + else: + wf_invocations[ wf_invocation ] = [ ( job, hdas ) ] + # Not attached to a workflow, add to items + else: + items.append( ( job, hdas ) ) + # Finally, add workflow invocations to items, which should now + # contain all hdas with some level of grouping + items.extend( wf_invocations.items() ) + # Sort items by age + items.sort( key=( lambda x: x[0].create_time ), reverse=True ) + # + return trans.fill_template( "history/display_structured.mako", items=items ) + @web.expose def delete_current( self, trans ): """Delete just the active history -- this does not require a logged in user.""" @@ -343,6 +408,7 @@ # Regardless of whether it was previously deleted, we make a new history active trans.new_history() return trans.show_ok_message( "History deleted, a new history is active", refresh_frames=['history'] ) + @web.expose def rename_async( self, trans, id=None, new_name=None ): history = self.get_history( trans, id ) @@ -493,6 +559,7 @@ Warning! If you import this history, you will lose your current history. Click <a href="%s">here</a> to confirm. """ % web.url_for( id=id, confirm=True ) ) + @web.expose def view( self, trans, id=None ): """View a history. If a history is importable, then it is viewable by any user.""" diff -r c48de7f12e58 -r 2f3b76827743 lib/galaxy/web/controllers/workflow.py --- a/lib/galaxy/web/controllers/workflow.py Mon Mar 01 17:47:28 2010 -0500 +++ b/lib/galaxy/web/controllers/workflow.py Thu Feb 11 00:17:04 2010 -0500 @@ -865,8 +865,12 @@ errors[step.id] = state.inputs["__errors__"] = step_errors if 'run_workflow' in kwargs and not errors: # Run each step, connecting outputs to inputs + workflow_invocation = model.WorkflowInvocation() + workflow_invocation.workflow = workflow outputs = odict() for i, step in enumerate( workflow.steps ): + # Execute module + job = None if step.type == 'tool' or step.type is None: tool = trans.app.toolbox.tools_by_id[ step.tool_id ] input_values = step.state.inputs @@ -878,10 +882,19 @@ return outputs[ conn.output_step.id ][ conn.output_name ] visit_input_values( tool.inputs, step.state.inputs, callback ) # Execute it - outputs[ step.id ] = tool.execute( trans, step.state.inputs ) + job, out_data = tool.execute( trans, step.state.inputs ) + outputs[ step.id ] = out_data else: - outputs[ step.id ] = step.module.execute( trans, step.state ) - + job, out_data = step.module.execute( trans, step.state ) + outputs[ step.id ] = out_data + # Record invocation + workflow_invocation_step = model.WorkflowInvocationStep() + workflow_invocation_step.workflow_invocation = workflow_invocation + workflow_invocation_step.workflow_step = step + workflow_invocation_step.job = job + # All jobs ran sucessfully, so we can save now + trans.sa_session.add( workflow_invocation ) + trans.sa_session.flush() return trans.fill_template( "workflow/run_complete.mako", workflow=stored, outputs=outputs ) diff -r c48de7f12e58 -r 2f3b76827743 lib/galaxy/workflow/modules.py --- a/lib/galaxy/workflow/modules.py Mon Mar 01 17:47:28 2010 -0500 +++ b/lib/galaxy/workflow/modules.py Thu Feb 11 00:17:04 2010 -0500 @@ -153,7 +153,7 @@ return errors def execute( self, trans, state ): - return dict( output=state.inputs['input']) + return None, dict( output=state.inputs['input']) class ToolModule( WorkflowModule ): diff -r c48de7f12e58 -r 2f3b76827743 scripts/manage_db.py --- a/scripts/manage_db.py Mon Mar 01 17:47:28 2010 -0500 +++ b/scripts/manage_db.py Thu Feb 11 00:17:04 2010 -0500 @@ -12,6 +12,7 @@ from migrate.versioning.shell import main from ConfigParser import SafeConfigParser + log = logging.getLogger( __name__ ) cp = SafeConfigParser() diff -r c48de7f12e58 -r 2f3b76827743 templates/root/index.mako --- a/templates/root/index.mako Mon Mar 01 17:47:28 2010 -0500 +++ b/templates/root/index.mako Thu Feb 11 00:17:04 2010 -0500 @@ -35,6 +35,9 @@ "Show Deleted Datasets": function() { galaxy_history.location = "${h.url_for( controller='root', action='history', show_deleted=True)}"; }, + "Show structure": function() { + galaxy_main.location = "${h.url_for( controller='history', action='display_structured' )}"; + }, "Delete": function() { if ( confirm( "Really delete the current history?" ) )