galaxy-dist commit 1d05df35ec27: Include job information when exporting and importing histories to/from files. This enables jobs associated with an imported history to be rerun and hence affords a history's analyses to be reproduced exactly. Jobs also have a new flag, imported, that is set to false by default but is true when a job is created via import. Also refactored export/import code to use custom JSON encoders.
# HG changeset patch -- Bitbucket.org # Project galaxy-dist # URL http://bitbucket.org/galaxy/galaxy-dist/overview # User jeremy goecks <jeremy.goecks@emory.edu> # Date 1277866995 14400 # Node ID 1d05df35ec27cc606ddc486c83ea090e01a441d8 # Parent 47411bb09df44719760aac61e4b90de4fd190aad Include job information when exporting and importing histories to/from files. This enables jobs associated with an imported history to be rerun and hence affords a history's analyses to be reproduced exactly. Jobs also have a new flag, imported, that is set to false by default but is true when a job is created via import. Also refactored export/import code to use custom JSON encoders. This code is in a very alpha/beta state: it may not work well, at all, or as expected. Complex datasets and/or jobs are likely not yet handled. Use with caution. Also, due to security issues, history/import and history/export methods are not currently web-accessible. Admins must manually make these methods accessible if they want to use them. --- a/templates/root/index.mako +++ b/templates/root/index.mako @@ -40,9 +40,9 @@ "Show structure": function() { galaxy_main.location = "${h.url_for( controller='history', action='display_structured' )}"; }, - "Export to File": function() { - galaxy_main.location = "${h.url_for( controller='history', action='export_archive' )}"; - }, + ##"Export to File": function() { + ## galaxy_main.location = "${h.url_for( controller='history', action='export_archive' )}"; + ##}, "Delete": function() { if ( confirm( "Really delete the current history?" ) ) @@ -50,10 +50,10 @@ galaxy_main.location = "${h.url_for( controller='history', action='delete_current' )}"; } }, - "Other Actions": null, - "Import from File": function() { - galaxy_main.location = "${h.url_for( controller='history', action='import_archive' )}"; - } + ##"Other Actions": null, + ##"Import from File": function() { + ## galaxy_main.location = "${h.url_for( controller='history', action='import_archive' )}"; + ##} }); // Init tool options. --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -115,6 +115,7 @@ class Job( object ): self.job_runner_name = None self.job_runner_external_id = None self.post_job_actions = None + self.imported = False def add_parameter( self, name, value ): self.parameters.append( JobParameter( name, value ) ) --- /dev/null +++ b/lib/galaxy/model/migrate/versions/0051_imported_col_for_jobs_table.py @@ -0,0 +1,48 @@ +""" +Migration script to add imported column for jobs table. +""" + +from sqlalchemy import * +from sqlalchemy.orm import * +from migrate import * +from migrate.changeset import * + +import logging +log = logging.getLogger( __name__ ) + +metadata = MetaData( migrate_engine ) +db_session = scoped_session( sessionmaker( bind=migrate_engine, autoflush=False, autocommit=True ) ) + +def upgrade(): + print __doc__ + metadata.reflect() + + # Create and initialize imported column in job table. + Jobs_table = Table( "job", metadata, autoload=True ) + c = Column( "imported", Boolean, default=False, index=True ) + try: + # Create + c.create( Jobs_table ) + assert c is Jobs_table.c.imported + + # Initialize. + if migrate_engine.name == 'mysql' or migrate_engine.name == 'sqlite': + default_false = "0" + elif migrate_engine.name == 'postgres': + default_false = "false" + db_session.execute( "UPDATE job SET imported=%s" % default_false ) + + except Exception, e: + print "Adding imported column to job table failed: %s" % str( e ) + log.debug( "Adding imported column to job table failed: %s" % str( e ) ) + +def downgrade(): + metadata.reflect() + + # Drop imported column from job table. + Jobs_table = Table( "job", metadata, autoload=True ) + try: + Jobs_table.c.imported.drop() + except Exception, e: + print "Dropping column imported from job table failed: %s" % str( e ) + log.debug( "Dropping column imported from job table failed: %s" % str( e ) ) --- a/lib/galaxy/web/controllers/history.py +++ b/lib/galaxy/web/controllers/history.py @@ -6,6 +6,7 @@ from galaxy.model.mapping import desc from galaxy.model.orm import * from galaxy.util.json import * from galaxy.util.sanitize_html import sanitize_html +from galaxy.tools.parameters.basic import UnvalidatedValue from galaxy.tools.actions import upload_common from galaxy.tags.tag_handler import GalaxyTagHandler from sqlalchemy.sql.expression import ClauseElement @@ -145,7 +146,7 @@ class HistoryAllPublishedGrid( grids.Gri def apply_query_filter( self, trans, query, **kwargs ): # A public history is published, has a slug, and is not deleted. return query.filter( self.model_class.published == True ).filter( self.model_class.slug != None ).filter( self.model_class.deleted == False ) - + class HistoryController( BaseController, Sharable, UsesAnnotations, UsesHistory ): @web.expose def index( self, trans ): @@ -443,26 +444,26 @@ class HistoryController( BaseController, trans.sa_session.flush() return new_annotation - @web.expose def import_archive( self, trans, archived_history=None ): """ Import a history. """ if archived_history is not None: - # Import archived history. try: history_archive_file = tarfile.open( archived_history.file.name ) - + # Security check: make sure that members are relative, not absolute. for tarinfo in history_archive_file.getmembers(): if tarinfo.name.startswith("/") or tarinfo.name.find("..") != -1: return trans.show_error_message( 'Error importing history archive: archive file is invalid.' ) - + # Unpack archive in temporary directory. temp_output_dir = tempfile.mkdtemp() history_archive_file.extractall( path=temp_output_dir ) history_archive_file.close() - - # Read history attributes. + + # + # Create history. + # history_attr_in = open( '%s/%s' % ( temp_output_dir, 'history_attrs.txt'), 'rb' ) history_attr_str = '' buffsize = 1048576 @@ -473,31 +474,33 @@ class HistoryController( BaseController, break except OverflowError: pass + history_attr_in.close() history_attrs = from_json_string( history_attr_str ) - + # Create history. new_history = model.History( name='imported from archive: %s' % history_attrs['name'].encode( 'utf-8' ), user=trans.user ) trans.sa_session.add( new_history ) - + + new_history.hid_counter = history_attrs['hid_counter'] + new_history.genome_build = history_attrs['genome_build'] + trans.sa_session.flush() + # Builds a tag string for a tag, value pair. def get_tag_str( tag, value ): if not value: return tag else: return tag + ":" + value - + # Add annotation, tags. if trans.user: self.add_item_annotation( trans, new_history, history_attrs[ 'annotation' ] ) for tag, value in history_attrs[ 'tags' ].items(): trans.app.tag_handler.apply_item_tags( trans, trans.user, new_history, get_tag_str( tag, value ) ) - - # Ignore hid_counter since it artificially increases the hid for all HDAs? - # new_history.hid_counter = history_attrs['hid_counter'] - new_history.genome_build = history_attrs['genome_build'] - trans.sa_session.flush() - - # Read datasets attributes. + + # + # Create datasets. + # datasets_attr_in = open( '%s/%s' % ( temp_output_dir, 'datasets_attrs.txt'), 'rb' ) datasets_attr_str = '' buffsize = 1048576 @@ -508,64 +511,187 @@ class HistoryController( BaseController, break except OverflowError: pass + datasets_attr_in.close() datasets_attrs = from_json_string( datasets_attr_str ) - - # Create datasets. + + # Create datasets. for dataset_attrs in datasets_attrs: metadata = dataset_attrs['metadata'] - + # Create dataset and HDA. - hda = trans.app.model.HistoryDatasetAssociation( name = dataset_attrs['name'].encode( 'utf-8' ), - extension = dataset_attrs['extension'], - hid = dataset_attrs['hid'], - info = dataset_attrs['info'].encode( 'utf-8' ), - blurb = dataset_attrs['blurb'], - peek = dataset_attrs['peek'], - designation = dataset_attrs['designation'], - visible = dataset_attrs['visible'], - dbkey = metadata['dbkey'], - metadata = metadata, - history = new_history, - create_dataset = True, - sa_session = trans.sa_session ) + hda = model.HistoryDatasetAssociation( name = dataset_attrs['name'].encode( 'utf-8' ), + extension = dataset_attrs['extension'], + info = dataset_attrs['info'].encode( 'utf-8' ), + blurb = dataset_attrs['blurb'], + peek = dataset_attrs['peek'], + designation = dataset_attrs['designation'], + visible = dataset_attrs['visible'], + dbkey = metadata['dbkey'], + metadata = metadata, + history = new_history, + create_dataset = True, + sa_session = trans.sa_session ) hda.state = hda.states.OK trans.sa_session.add( hda ) trans.sa_session.flush() new_history.add_dataset( hda, genome_build = None ) + hda.hid = dataset_attrs['hid'] # Overwrite default hid set when HDA added to history. permissions = trans.app.security_agent.history_get_default_permissions( new_history ) trans.app.security_agent.set_all_dataset_permissions( hda.dataset, permissions ) trans.sa_session.flush() - + # Copy dataset data. temp_dataset_name = '%s/datasets/%s' % ( temp_output_dir, dataset_attrs['file_name'] ) shutil.copyfile( temp_dataset_name, hda.file_name ) - + # Set tags, annotations. if trans.user: self.add_item_annotation( trans, hda, dataset_attrs[ 'annotation' ] ) for tag, value in dataset_attrs[ 'tags' ].items(): trans.app.tag_handler.apply_item_tags( trans, trans.user, hda, get_tag_str( tag, value ) ) - trans.sa_session.flush() + trans.sa_session.flush() + # + # Create jobs. + # + + # Read jobs attributes. + jobs_attr_in = open( '%s/%s' % ( temp_output_dir, 'jobs_attrs.txt'), 'rb' ) + jobs_attr_str = '' + buffsize = 1048576 + try: + while True: + jobs_attr_str += jobs_attr_in.read( buffsize ) + if not jobs_attr_str or len( jobs_attr_str ) % buffsize != 0: + break + except OverflowError: + pass + jobs_attr_in.close() + + # Decode jobs attributes. + def as_hda( obj_dct ): + """ Hook to 'decode' an HDA; method uses history and HID to get the HDA represented by + the encoded object. This only works because HDAs are created above. """ + if obj_dct.get( '__HistoryDatasetAssociation__', False ): + return trans.sa_session.query( model.HistoryDatasetAssociation ) \ + .filter_by( history=new_history, hid=obj_dct['hid'] ).first() + return obj_dct + jobs_attrs = from_json_string( jobs_attr_str, object_hook=as_hda ) + + # Create each job. + for job_attrs in jobs_attrs: + imported_job = model.Job() + imported_job.user = trans.user + imported_job.session = trans.get_galaxy_session().id + imported_job.history = new_history + imported_job.tool_id = job_attrs[ 'tool_id' ] + imported_job.tool_version = job_attrs[ 'tool_version' ] + imported_job.set_state( job_attrs[ 'state' ] ) + imported_job.imported = True + trans.sa_session.add( imported_job ) + trans.sa_session.flush() + + class HistoryDatasetAssociationIDEncoder( simplejson.JSONEncoder ): + """ Custom JSONEncoder for a HistoryDatasetAssociation that encodes an HDA as its ID. """ + def default( self, obj ): + """ Encode an HDA, default encoding for everything else. """ + if isinstance( obj, model.HistoryDatasetAssociation ): + return obj.id + return simplejson.JSONEncoder.default( self, obj ) + + # Set parameters. May be useful to look at metadata.py for creating parameters. + # TODO: there may be a better way to set parameters, e.g.: + # for name, value in tool.params_to_strings( incoming, trans.app ).iteritems(): + # job.add_parameter( name, value ) + # to make this work, we'd need to flesh out the HDA objects. The code below is + # relatively similar. + for name, value in job_attrs[ 'params' ].items(): + # Transform parameter values when necessary. + if isinstance( value, model.HistoryDatasetAssociation ): + # HDA input: use hid to find input. + input_hda = trans.sa_session.query( model.HistoryDatasetAssociation ) \ + .filter_by( history=new_history, hid=value.hid ).first() + value = input_hda.id + #print "added parameter %s-->%s to job %i" % ( name, value, imported_job.id ) + imported_job.add_parameter( name, to_json_string( value, cls=HistoryDatasetAssociationIDEncoder ) ) + + # TODO: Connect jobs to input datasets. + + # Connect jobs to output datasets. + for output_hid in job_attrs[ 'output_datasets' ]: + #print "%s job has output dataset %i" % (imported_job.id, output_hid) + output_hda = trans.sa_session.query( model.HistoryDatasetAssociation ) \ + .filter_by( history=new_history, hid=output_hid ).first() + if output_hda: + imported_job.add_output_dataset( output_hda.name, output_hda ) + trans.sa_session.flush() + # Cleanup. if os.path.exists( temp_output_dir ): shutil.rmtree( temp_output_dir ) - + return trans.show_ok_message( message="History '%s' has been imported. " % history_attrs['name'] ) except Exception, e: return trans.show_error_message( 'Error importing history archive. ' + str( e ) ) - return trans.show_form( web.FormBuilder( web.url_for(), "Import a History from an Archive", submit_text="Submit" ) .add_input( "file", "Archived History File", "archived_history", value=None, error=None ) ) - @web.expose def export_archive( self, trans, id=None ): """ Export a history. """ + # + # Helper methods/classes. + # + + def unicode_wrangler( a_string ): + """ Convert strings to unicode in utf-8 format. Method should be used for all user input. """ + a_string_type = type ( a_string ) + if a_string_type is str: + return unicode( a_string, 'utf-8' ) + elif a_string_type is unicode: + return a_string.encode( 'utf-8' ) + + def get_item_tag_dict( item ): + """ Create dictionary of an item's tags. """ + tags = {} + for tag in item.tags: + tag_user_tname = unicode_wrangler( tag.user_tname ) + tag_user_value = unicode_wrangler( tag.user_value ) + tags[ tag_user_tname ] = tag_user_value + return tags + + class HistoryDatasetAssociationEncoder( simplejson.JSONEncoder ): + """ Custom JSONEncoder for a HistoryDatasetAssociation. """ + def default( self, obj ): + """ Encode an HDA, default encoding for everything else. """ + if isinstance( obj, model.HistoryDatasetAssociation ): + return { + "__HistoryDatasetAssociation__" : True, + "create_time" : obj.create_time.__str__(), + "update_time" : obj.update_time.__str__(), + "hid" : obj.hid, + "name" : unicode_wrangler( obj.name ), + "info" : unicode_wrangler( obj.info ), + "blurb" : obj.blurb, + "peek" : obj.peek, + "extension" : obj.extension, + "metadata" : dict( obj.metadata.items() ), + "parent_id" : obj.parent_id, + "designation" : obj.designation, + "deleted" : obj.deleted, + "visible" : obj.visible, + "file_name" : obj.file_name.split('/')[-1], + "annotation" : unicode_wrangler( obj.annotation ), + "tags" : get_item_tag_dict( obj ), + } + return simplejson.JSONEncoder.default( self, obj ) + + # # Get history to export. + # if id: history = self.get_history( trans, id, check_ownership=False, check_accessible=True ) else: @@ -577,35 +703,24 @@ class HistoryController( BaseController, history_export_dir_name = "./database/export" archive_file_name = '%s/%s.tar.gz' % ( history_export_dir_name, trans.security.encode_id( history.id ) ) + + # + # Do export. + # + # TODO: for now, always create archive when exporting; this is for debugging purposes. if True: # Condition for only creating an archive when history is newer than archive: #not os.path.exists ( archive_file_name ) or datetime.utcfromtimestamp( os.path.getmtime( archive_file_name ) ) < history.update_time: # Create archive and stream back to client. - - # Convert strings to unicode in utf-8 format. Method should be used for all user input. - def unicode_wrangler( a_string ): - a_string_type = type ( a_string ) - if a_string_type is str: - return unicode( a_string, 'utf-8' ) - elif a_string_type is unicode: - return a_string.encode( 'utf-8' ) - - # Create dictionary of an item's tags. - def get_item_tag_dict( item ): - tags = {} - for tag in item.tags: - tag_user_tname = unicode_wrangler( tag.user_tname ) - tag_user_value = unicode_wrangler( tag.user_value ) - tags[ tag_user_tname ] = tag_user_value - return tags - try: # Use temporary directory for temp output files. temp_output_dir = tempfile.mkdtemp() - + + # # Write history attributes to file. + # history_attrs = { "create_time" : history.create_time.__str__(), "update_time" : history.update_time.__str__(), @@ -619,58 +734,103 @@ class HistoryController( BaseController, history_attrs_out = open( history_attrs_file_name, 'w' ) history_attrs_out.write( to_json_string( history_attrs ) ) history_attrs_out.close() - new_name = '%s/%s' % ( temp_output_dir, "history_attrs.txt" ) - os.rename( history_attrs_file_name, new_name ) - history_attrs_file_name = new_name - + + # # Write datasets' attributes to file. + # datasets = self.get_history_datasets( trans, history ) datasets_attrs = [] for dataset in datasets: - attribute_dict = { - "create_time" : dataset.create_time.__str__(), - "update_time" : dataset.update_time.__str__(), - "hid" : dataset.hid, - "name" : unicode_wrangler( dataset.name ), - "info" : unicode_wrangler( dataset.info ), - "blurb" : dataset.blurb, - "peek" : dataset.peek, - "extension" : dataset.extension, - "metadata" : dict( dataset.metadata.items() ), - "parent_id" : dataset.parent_id, - "designation" : dataset.designation, - "deleted" : dataset.deleted, - "visible" : dataset.visible, - "file_name" : dataset.file_name.split('/')[-1], - "annotation" : unicode_wrangler( self.get_item_annotation_str( trans, history.user, dataset ) ), - "tags" : get_item_tag_dict( dataset ) - } - datasets_attrs.append( attribute_dict ) + dataset.annotation = self.get_item_annotation_str( trans, history.user, dataset ) + datasets_attrs.append( dataset ) datasets_attrs_file_name = tempfile.NamedTemporaryFile( dir=temp_output_dir ).name datasets_attrs_out = open( datasets_attrs_file_name, 'w' ) - datasets_attrs_out.write( to_json_string( datasets_attrs ) ) + datasets_attrs_out.write( to_json_string( datasets_attrs, cls=HistoryDatasetAssociationEncoder ) ) datasets_attrs_out.close() - new_name = '%s/%s' % ( temp_output_dir, "datasets_attrs.txt" ) - os.rename( datasets_attrs_file_name, new_name ) - datasets_attrs_file_name = new_name - - # Write files to archive: (a) history attributes file; (b) datasets attributes file; and (c) datasets files. + + # + # Write jobs attributes file. + # + + # Get all jobs associated with HDAs. + jobs_dict = {} + for hda in datasets: + # Get the associated job, if any. If this hda was copied from another, + # we need to find the job that created the origial hda + job_hda = hda + while job_hda.copied_from_history_dataset_association: #should this check library datasets as well? + job_hda = job_hda.copied_from_history_dataset_association + if not job_hda.creating_job_associations: + # No viable HDA found. + continue + + # Get the job object. + job = None + for assoc in job_hda.creating_job_associations: + job = assoc.job + break + if not job: + # No viable job. + continue + + jobs_dict[ job.id ] = job + + # Get jobs' attributes. + jobs_attrs = [] + for id, job in jobs_dict.items(): + job_attrs = {} + job_attrs[ 'tool_id' ] = job.tool_id + job_attrs[ 'tool_version' ] = job.tool_version + job_attrs[ 'state' ] = job.state + + # Get the job's parameters + try: + params_objects = job.get_param_values( trans.app ) + except: + # Could not get job params. + continue + + params_dict = {} + for name, value in params_objects.items(): + params_dict[ name ] = value + job_attrs[ 'params' ] = params_dict + + # Get input, output datasets. + input_datasets = [ assoc.dataset.hid for assoc in job.input_datasets ] + job_attrs[ 'input_datasets' ] = input_datasets + output_datasets = [ assoc.dataset.hid for assoc in job.output_datasets ] + job_attrs[ 'output_datasets' ] = output_datasets + + jobs_attrs.append( job_attrs ) + + jobs_attrs_file_name = tempfile.NamedTemporaryFile( dir=temp_output_dir ).name + jobs_attrs_out = open( jobs_attrs_file_name, 'w' ) + jobs_attrs_out.write( to_json_string( jobs_attrs, cls=HistoryDatasetAssociationEncoder ) ) + jobs_attrs_out.close() + + # + # Write archive and include: (a) history attributes file; (b) datasets attributes file; + # (c) jobs attributes file; and (d) datasets files. + # history_archive_name = '%s/%s.tar.gz' % ( history_export_dir_name, trans.security.encode_id( history.id ) ) history_archive = tarfile.open( history_archive_name, "w:gz" ) history_archive.add( history_attrs_file_name, arcname="history_attrs.txt" ) history_archive.add( datasets_attrs_file_name, arcname="datasets_attrs.txt" ) + history_archive.add( jobs_attrs_file_name, arcname="jobs_attrs.txt" ) for i, dataset in enumerate( datasets ) : - history_archive.add( dataset.file_name, arcname="datasets/%s" % datasets_attrs[i]['file_name'] ) + history_archive.add( dataset.file_name, arcname="datasets/%s" % dataset.file_name.split('/')[-1] ) history_archive.close() - + # Remove temp directory. if os.path.exists( temp_output_dir ): shutil.rmtree( temp_output_dir ) - + except Exception, e: return trans.show_error_message( 'Error creating history archive. ' + str( e ) ) - + + # # Stream archive. + # if os.path.exists( history_archive_name ): valid_chars = '.,^_-()[]0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ' hname = history.name --- a/lib/galaxy/model/mapping.py +++ b/lib/galaxy/model/mapping.py @@ -334,7 +334,8 @@ Job.table = Table( "job", metadata, Column( "session_id", Integer, ForeignKey( "galaxy_session.id" ), index=True, nullable=True ), Column( "user_id", Integer, ForeignKey( "galaxy_user.id" ), index=True, nullable=True ), Column( "job_runner_name", String( 255 ) ), - Column( "job_runner_external_id", String( 255 ) ) ) + Column( "job_runner_external_id", String( 255 ) ), + Column( "imported", Boolean, default=False, index=True ) ) JobParameter.table = Table( "job_parameter", metadata, Column( "id", Integer, primary_key=True ),
participants (1)
-
commits-noreply@bitbucket.org