galaxy-dist commit e6e68c67dd97: Use a job to import a history from an archive. This makes it possible to move a history from one Galaxy instance to another, subject to the limitations below. The job itself unpacks the archive, and the job's cleanup creates a history from the archive. A history imported via an archive is only visible to the user when it has finished being imported.
# HG changeset patch -- Bitbucket.org # Project galaxy-dist # URL http://bitbucket.org/galaxy/galaxy-dist/overview # User jeremy goecks <jeremy.goecks@emory.edu> # Date 1288106620 14400 # Node ID e6e68c67dd9759154876546662f59ee280b78a34 # Parent 677bcc65f74f5a460277f089d0cdebf170f02831 Use a job to import a history from an archive. This makes it possible to move a history from one Galaxy instance to another, subject to the limitations below. The job itself unpacks the archive, and the job's cleanup creates a history from the archive. A history imported via an archive is only visible to the user when it has finished being imported. Currently, there are limitations to this functionality; these will be fixed going forward. These limitations are: -history archives can be imported only via URL, not file; -histories must be shared in order for them to be importable via archive; -tags are not currently imported; -reproducibility is limited as parameters for imported jobs are not always recovered and set. --- a/lib/galaxy/web/controllers/history.py +++ b/lib/galaxy/web/controllers/history.py @@ -79,7 +79,7 @@ class HistoryListGrid( grids.Grid ): def get_current_item( self, trans, **kwargs ): return trans.get_history() def apply_query_filter( self, trans, query, **kwargs ): - return query.filter_by( user=trans.user, purged=False ) + return query.filter_by( user=trans.user, purged=False, importing=False ) class SharedHistoryListGrid( grids.Grid ): # Custom column types @@ -465,212 +465,40 @@ class HistoryController( BaseController, trans.sa_session.flush() return new_annotation - def import_archive( self, trans, archived_history=None, gzip=True ): - """ Import a history. """ + @web.expose + # TODO: Remove require_login when users are warned that, if they are not + # logged in, this will remove their current history. + @web.require_login( "use Galaxy histories" ) + def import_archive( self, trans, **kwargs ): + """ Import a history from a file archive. """ - def file_in_dir( file_path, a_dir ): - """ Returns true if file is in directory. """ - abs_file_path = os.path.abspath( file_path ) - return os.path.split( abs_file_path )[0] == a_dir + # Set archive source and type. + archive_file = kwargs.get( 'archive_file', None ) + archive_url = kwargs.get( 'archive_url', None ) + archive_source = None + if archive_file: + archive_source = archive_file + archive_type = 'file' + elif archive_url: + archive_source = archive_url + archive_type = 'url' - if archived_history is not None: - try: - history_archive_file = tarfile.open( archived_history.file.name ) - - # Unpack archive in temporary directory. - temp_output_dir = tempfile.mkdtemp() - history_archive_file.extractall( path=temp_output_dir ) - history_archive_file.close() - - # - # Create history. - # - history_attr_file_name = os.path.join( temp_output_dir, 'history_attrs.txt') - if not file_in_dir( history_attr_file_name, temp_output_dir ): - raise Exception( "Invalid location for history attributes file: %s" % history_attr_file_name ) - history_attr_in = open( history_attr_file_name, 'rb' ) - history_attr_str = '' - buffsize = 1048576 - try: - while True: - history_attr_str += history_attr_in.read( buffsize ) - if not history_attr_str or len( history_attr_str ) % buffsize != 0: - break - except OverflowError: - pass - history_attr_in.close() - history_attrs = from_json_string( history_attr_str ) - - # Create history. - new_history = model.History( name='imported from archive: %s' % history_attrs['name'].encode( 'utf-8' ), user=trans.user ) - trans.sa_session.add( new_history ) - - new_history.hid_counter = history_attrs['hid_counter'] - new_history.genome_build = history_attrs['genome_build'] - trans.sa_session.flush() - - # Builds a tag string for a tag, value pair. - def get_tag_str( tag, value ): - if not value: - return tag - else: - return tag + ":" + value - - # Add annotation, tags. - if trans.user: - self.add_item_annotation( trans.sa_session, trans.get_user(), new_history, history_attrs[ 'annotation' ] ) - for tag, value in history_attrs[ 'tags' ].items(): - trans.app.tag_handler.apply_item_tags( trans, trans.user, new_history, get_tag_str( tag, value ) ) - - # - # Create datasets. - # - datasets_attrs_file_name = os.path.join( temp_output_dir, 'datasets_attrs.txt') - if not file_in_dir( datasets_attrs_file_name, temp_output_dir ): - raise Exception( "Invalid location for dataset attributes file: %s" % datasets_attrs_file_name ) - datasets_attr_in = open( datasets_attrs_file_name, 'rb' ) - datasets_attr_str = '' - buffsize = 1048576 - try: - while True: - datasets_attr_str += datasets_attr_in.read( buffsize ) - if not datasets_attr_str or len( datasets_attr_str ) % buffsize != 0: - break - except OverflowError: - pass - datasets_attr_in.close() - datasets_attrs = from_json_string( datasets_attr_str ) - - # Create datasets. - for dataset_attrs in datasets_attrs: - metadata = dataset_attrs['metadata'] - - # Create dataset and HDA. - hda = model.HistoryDatasetAssociation( name = dataset_attrs['name'].encode( 'utf-8' ), - extension = dataset_attrs['extension'], - info = dataset_attrs['info'].encode( 'utf-8' ), - blurb = dataset_attrs['blurb'], - peek = dataset_attrs['peek'], - designation = dataset_attrs['designation'], - visible = dataset_attrs['visible'], - dbkey = metadata['dbkey'], - metadata = metadata, - history = new_history, - create_dataset = True, - sa_session = trans.sa_session ) - hda.state = hda.states.OK - trans.sa_session.add( hda ) - trans.sa_session.flush() - new_history.add_dataset( hda, genome_build = None ) - hda.hid = dataset_attrs['hid'] # Overwrite default hid set when HDA added to history. - permissions = trans.app.security_agent.history_get_default_permissions( new_history ) - trans.app.security_agent.set_all_dataset_permissions( hda.dataset, permissions ) - trans.sa_session.flush() - - # Do security check and copy dataset data. - temp_dataset_file_name = os.path.join( temp_output_dir, dataset_attrs['file_name'] ) - if not file_in_dir( temp_dataset_file_name, os.path.join( temp_output_dir, "datasets" ) ): - raise Exception( "Invalid dataset path: %s" % temp_dataset_file_name ) - shutil.move( temp_dataset_file_name, hda.file_name ) - - # Set tags, annotations. - if trans.user: - self.add_item_annotation( trans.sa_session, trans.get_user(), hda, dataset_attrs[ 'annotation' ] ) - for tag, value in dataset_attrs[ 'tags' ].items(): - trans.app.tag_handler.apply_item_tags( trans, trans.user, hda, get_tag_str( tag, value ) ) - trans.sa_session.flush() - - # - # Create jobs. - # - - # Read jobs attributes. - jobs_attr_file_name = os.path.join( temp_output_dir, 'jobs_attrs.txt') - if not file_in_dir( jobs_attr_file_name, temp_output_dir ): - raise Exception( "Invalid location for jobs' attributes file: %s" % jobs_attr_file_name ) - jobs_attr_in = open( jobs_attr_file_name, 'rb' ) - jobs_attr_str = '' - buffsize = 1048576 - try: - while True: - jobs_attr_str += jobs_attr_in.read( buffsize ) - if not jobs_attr_str or len( jobs_attr_str ) % buffsize != 0: - break - except OverflowError: - pass - jobs_attr_in.close() - - # Decode jobs attributes. - def as_hda( obj_dct ): - """ Hook to 'decode' an HDA; method uses history and HID to get the HDA represented by - the encoded object. This only works because HDAs are created above. """ - if obj_dct.get( '__HistoryDatasetAssociation__', False ): - return trans.sa_session.query( model.HistoryDatasetAssociation ) \ - .filter_by( history=new_history, hid=obj_dct['hid'] ).first() - return obj_dct - jobs_attrs = from_json_string( jobs_attr_str, object_hook=as_hda ) - - # Create each job. - for job_attrs in jobs_attrs: - imported_job = model.Job() - imported_job.user = trans.user - imported_job.session = trans.get_galaxy_session().id - imported_job.history = new_history - imported_job.tool_id = job_attrs[ 'tool_id' ] - imported_job.tool_version = job_attrs[ 'tool_version' ] - imported_job.set_state( job_attrs[ 'state' ] ) - imported_job.imported = True - trans.sa_session.add( imported_job ) - trans.sa_session.flush() - - class HistoryDatasetAssociationIDEncoder( simplejson.JSONEncoder ): - """ Custom JSONEncoder for a HistoryDatasetAssociation that encodes an HDA as its ID. """ - def default( self, obj ): - """ Encode an HDA, default encoding for everything else. """ - if isinstance( obj, model.HistoryDatasetAssociation ): - return obj.id - return simplejson.JSONEncoder.default( self, obj ) + # If no source to create archive from, show form to upload archive or specify URL. + if not archive_source: + return trans.show_form( + web.FormBuilder( web.url_for(), "Import a History from an Archive", submit_text="Submit" ) \ + .add_input( "text", "Archived History URL", "archive_url", value="", error=None ) + # TODO: add support for importing via a file. + #.add_input( "file", "Archived History File", "archive_file", value=None, error=None ) + ) - # Set parameters. May be useful to look at metadata.py for creating parameters. - # TODO: there may be a better way to set parameters, e.g.: - # for name, value in tool.params_to_strings( incoming, trans.app ).iteritems(): - # job.add_parameter( name, value ) - # to make this work, we'd need to flesh out the HDA objects. The code below is - # relatively similar. - for name, value in job_attrs[ 'params' ].items(): - # Transform parameter values when necessary. - if isinstance( value, model.HistoryDatasetAssociation ): - # HDA input: use hid to find input. - input_hda = trans.sa_session.query( model.HistoryDatasetAssociation ) \ - .filter_by( history=new_history, hid=value.hid ).first() - value = input_hda.id - #print "added parameter %s-->%s to job %i" % ( name, value, imported_job.id ) - imported_job.add_parameter( name, to_json_string( value, cls=HistoryDatasetAssociationIDEncoder ) ) - - # TODO: Connect jobs to input datasets. - - # Connect jobs to output datasets. - for output_hid in job_attrs[ 'output_datasets' ]: - #print "%s job has output dataset %i" % (imported_job.id, output_hid) - output_hda = trans.sa_session.query( model.HistoryDatasetAssociation ) \ - .filter_by( history=new_history, hid=output_hid ).first() - if output_hda: - imported_job.add_output_dataset( output_hda.name, output_hda ) - trans.sa_session.flush() - - # Cleanup. - if os.path.exists( temp_output_dir ): - shutil.rmtree( temp_output_dir ) - - return trans.show_ok_message( message="History '%s' has been imported. " % history_attrs['name'] ) - except Exception, e: - return trans.show_error_message( 'Error importing history archive. ' + str( e ) ) - - return trans.show_form( - web.FormBuilder( web.url_for(), "Import a History from an Archive", submit_text="Submit" ) - .add_input( "file", "Archived History File", "archived_history", value=None, error=None ) - ) - + # Run job to do import. + history_imp_tool = trans.app.toolbox.tools_by_id[ '__IMPORT_HISTORY__' ] + incoming = { '__ARCHIVE_SOURCE__' : archive_source, '__ARCHIVE_TYPE__' : archive_type } + history_imp_tool.execute( trans, incoming=incoming ) + return trans.show_message( "Importing history from '%s'. \ + This history will be visible when the import is complete" % archive_source ) + @web.expose def export_archive( self, trans, id=None, gzip=True, include_hidden=False, include_deleted=False ): """ Export a history to an archive. """ --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -608,6 +608,7 @@ class JobWrapper( object ): if self.app.config.set_metadata_externally: self.external_output_metadata.cleanup_external_metadata( self.sa_session ) galaxy.tools.imp_exp.JobExportHistoryArchiveWrapper( self.job_id ).cleanup_after_job( self.sa_session ) + galaxy.tools.imp_exp.JobImportHistoryArchiveWrapper( self.job_id ).cleanup_after_job( self.sa_session ) except: log.exception( "Unable to cleanup job %d" % self.job_id ) --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -227,7 +227,13 @@ class JobExportHistoryArchive( object ): self.history_attrs_filename = history_attrs_filename self.datasets_attrs_filename = datasets_attrs_filename self.jobs_attrs_filename = jobs_attrs_filename - + +class JobImportHistoryArchive( object ): + def __init__( self, job=None, history=None, archive_dir=None ): + self.job = job + self.history = history + self.archive_dir=archive_dir + class Group( object ): def __init__( self, name = None ): self.name = name @@ -244,6 +250,7 @@ class History( object, UsesAnnotations ) self.name = name or "Unnamed history" self.deleted = False self.purged = False + self.importing = False self.genome_build = None self.published = False # Relationships --- a/lib/galaxy/tools/imp_exp/__init__.py +++ b/lib/galaxy/tools/imp_exp/__init__.py @@ -2,7 +2,7 @@ import os, shutil, logging, tempfile, si from galaxy import model from galaxy.web.framework.helpers import to_unicode from galaxy.model.item_attrs import UsesAnnotations -from galaxy.util.json import to_json_string +from galaxy.util.json import * from galaxy.web.base.controller import UsesHistory log = logging.getLogger(__name__) @@ -26,21 +26,252 @@ def load_history_imp_exp_tools( toolbox </outputs></tool> """ + + # Load export tool. tmp_name = tempfile.NamedTemporaryFile() tmp_name.write( tool_xml_text ) tmp_name.flush() history_exp_tool = toolbox.load_tool( tmp_name.name ) toolbox.tools_by_id[ history_exp_tool.id ] = history_exp_tool log.debug( "Loaded history export tool: %s", history_exp_tool.id ) + + # Load import tool. + tool_xml = os.path.join( os.getcwd(), "lib/galaxy/tools/imp_exp/imp_history_from_archive.xml" ) + history_imp_tool = toolbox.load_tool( tool_xml ) + toolbox.tools_by_id[ history_imp_tool.id ] = history_imp_tool + log.debug( "Loaded history import tool: %s", history_imp_tool.id ) + +class JobImportHistoryArchiveWrapper( object, UsesHistory, UsesAnnotations ): + """ + Class provides support for performing jobs that import a history from + an archive. + """ + def __init__( self, job_id ): + self.job_id = job_id + + def cleanup_after_job( self, db_session ): + """ Set history, datasets, and jobs' attributes and clean up archive directory. """ + + # + # Helper methods. + # + + def file_in_dir( file_path, a_dir ): + """ Returns true if file is in directory. """ + abs_file_path = os.path.abspath( file_path ) + return os.path.split( abs_file_path )[0] == a_dir + + def read_file_contents( file_path ): + """ Read contents of a file. """ + fp = open( file_path, 'rb' ) + buffsize = 1048576 + file_contents = '' + try: + while True: + file_contents += fp.read( buffsize ) + if not file_contents or len( file_contents ) % buffsize != 0: + break + except OverflowError: + pass + fp.close() + return file_contents + + def get_tag_str( tag, value ): + """ Builds a tag string for a tag, value pair. """ + if not value: + return tag + else: + return tag + ":" + value + + # + # Import history. + # + + jiha = db_session.query( model.JobImportHistoryArchive ).filter_by( job_id=self.job_id ).first() + if jiha: + try: + archive_dir = jiha.archive_dir + user = jiha.job.user + + # + # Create history. + # + history_attr_file_name = os.path.join( archive_dir, 'history_attrs.txt') + history_attr_str = read_file_contents( history_attr_file_name ) + history_attrs = from_json_string( history_attr_str ) + + # Create history. + new_history = model.History( name='imported from archive: %s' % history_attrs['name'].encode( 'utf-8' ), \ + user=user ) + new_history.importing = True + new_history.hid_counter = history_attrs['hid_counter'] + new_history.genome_build = history_attrs['genome_build'] + db_session.add( new_history ) + jiha.history = new_history + db_session.flush() + + # Add annotation, tags. + if user: + self.add_item_annotation( db_session, user, new_history, history_attrs[ 'annotation' ] ) + """ + TODO: figure out to how add tags to item. + for tag, value in history_attrs[ 'tags' ].items(): + trans.app.tag_handler.apply_item_tags( trans, trans.user, new_history, get_tag_str( tag, value ) ) + """ + + # + # Create datasets. + # + datasets_attrs_file_name = os.path.join( archive_dir, 'datasets_attrs.txt') + datasets_attr_str = read_file_contents( datasets_attrs_file_name ) + datasets_attrs = from_json_string( datasets_attr_str ) + + # Get counts of how often each dataset file is used; a file can + # be linked to multiple dataset objects (HDAs). + datasets_usage_counts = {} + for dataset_attrs in datasets_attrs: + temp_dataset_file_name = \ + os.path.abspath( os.path.join( archive_dir, dataset_attrs['file_name'] ) ) + if ( temp_dataset_file_name not in datasets_usage_counts ): + datasets_usage_counts[ temp_dataset_file_name ] = 0 + datasets_usage_counts[ temp_dataset_file_name ] += 1 + + # Create datasets. + for dataset_attrs in datasets_attrs: + metadata = dataset_attrs['metadata'] + + # Create dataset and HDA. + hda = model.HistoryDatasetAssociation( name = dataset_attrs['name'].encode( 'utf-8' ), + extension = dataset_attrs['extension'], + info = dataset_attrs['info'].encode( 'utf-8' ), + blurb = dataset_attrs['blurb'], + peek = dataset_attrs['peek'], + designation = dataset_attrs['designation'], + visible = dataset_attrs['visible'], + dbkey = metadata['dbkey'], + metadata = metadata, + history = new_history, + create_dataset = True, + sa_session = db_session ) + hda.state = hda.states.OK + db_session.add( hda ) + db_session.flush() + new_history.add_dataset( hda, genome_build = None ) + hda.hid = dataset_attrs['hid'] # Overwrite default hid set when HDA added to history. + # TODO: Is there a way to recover permissions? Is this needed? + #permissions = trans.app.security_agent.history_get_default_permissions( new_history ) + #trans.app.security_agent.set_all_dataset_permissions( hda.dataset, permissions ) + db_session.flush() + + # Do security check and move/copy dataset data. + temp_dataset_file_name = \ + os.path.abspath( os.path.join( archive_dir, dataset_attrs['file_name'] ) ) + if not file_in_dir( temp_dataset_file_name, os.path.join( archive_dir, "datasets" ) ): + raise Exception( "Invalid dataset path: %s" % temp_dataset_file_name ) + if datasets_usage_counts[ temp_dataset_file_name ] == 1: + shutil.move( temp_dataset_file_name, hda.file_name ) + else: + datasets_usage_counts[ temp_dataset_file_name ] -= 1 + shutil.copyfile( temp_dataset_file_name, hda.file_name ) + + # Set tags, annotations. + if user: + self.add_item_annotation( db_session, user, hda, dataset_attrs[ 'annotation' ] ) + # TODO: Set tags. + """ + for tag, value in dataset_attrs[ 'tags' ].items(): + trans.app.tag_handler.apply_item_tags( trans, trans.user, hda, get_tag_str( tag, value ) ) + db_session.flush() + """ + + # + # Create jobs. + # + + # Read jobs attributes. + jobs_attr_file_name = os.path.join( archive_dir, 'jobs_attrs.txt') + jobs_attr_str = read_file_contents( jobs_attr_file_name ) + + # Decode jobs attributes. + def as_hda( obj_dct ): + """ Hook to 'decode' an HDA; method uses history and HID to get the HDA represented by + the encoded object. This only works because HDAs are created above. """ + if obj_dct.get( '__HistoryDatasetAssociation__', False ): + return db_session.query( model.HistoryDatasetAssociation ) \ + .filter_by( history=new_history, hid=obj_dct['hid'] ).first() + return obj_dct + jobs_attrs = from_json_string( jobs_attr_str, object_hook=as_hda ) + + # Create each job. + for job_attrs in jobs_attrs: + imported_job = model.Job() + imported_job.user = user + # TODO: set session? + # imported_job.session = trans.get_galaxy_session().id + imported_job.history = new_history + imported_job.tool_id = job_attrs[ 'tool_id' ] + imported_job.tool_version = job_attrs[ 'tool_version' ] + imported_job.set_state( job_attrs[ 'state' ] ) + imported_job.imported = True + db_session.add( imported_job ) + db_session.flush() + + class HistoryDatasetAssociationIDEncoder( simplejson.JSONEncoder ): + """ Custom JSONEncoder for a HistoryDatasetAssociation that encodes an HDA as its ID. """ + def default( self, obj ): + """ Encode an HDA, default encoding for everything else. """ + if isinstance( obj, model.HistoryDatasetAssociation ): + return obj.id + return simplejson.JSONEncoder.default( self, obj ) + + # Set parameters. May be useful to look at metadata.py for creating parameters. + # TODO: there may be a better way to set parameters, e.g.: + # for name, value in tool.params_to_strings( incoming, trans.app ).iteritems(): + # job.add_parameter( name, value ) + # to make this work, we'd need to flesh out the HDA objects. The code below is + # relatively similar. + for name, value in job_attrs[ 'params' ].items(): + # Transform parameter values when necessary. + if isinstance( value, model.HistoryDatasetAssociation ): + # HDA input: use hid to find input. + input_hda = db_session.query( model.HistoryDatasetAssociation ) \ + .filter_by( history=new_history, hid=value.hid ).first() + value = input_hda.id + #print "added parameter %s-->%s to job %i" % ( name, value, imported_job.id ) + imported_job.add_parameter( name, to_json_string( value, cls=HistoryDatasetAssociationIDEncoder ) ) + + # TODO: Connect jobs to input datasets. + + # Connect jobs to output datasets. + for output_hid in job_attrs[ 'output_datasets' ]: + #print "%s job has output dataset %i" % (imported_job.id, output_hid) + output_hda = db_session.query( model.HistoryDatasetAssociation ) \ + .filter_by( history=new_history, hid=output_hid ).first() + if output_hda: + imported_job.add_output_dataset( output_hda.name, output_hda ) + + # Done importing. + new_history.importing = False + + db_session.flush() + + # Cleanup. + if os.path.exists( archive_dir ): + shutil.rmtree( archive_dir ) + except Exception, e: + jiha.job.stderr += "Error cleaning up history import job: %s" % e + db_session.flush() class JobExportHistoryArchiveWrapper( object, UsesHistory, UsesAnnotations ): - """ Class provides support for performing jobs that export a history to an archive. """ + """ + Class provides support for performing jobs that export a history to an + archive. + """ def __init__( self, job_id ): self.job_id = job_id # TODO: should use db_session rather than trans in this method. def setup_job( self, trans, jeha, include_hidden=False, include_deleted=False ): - # jeha = job_export_history_archive for the job. """ Perform setup for job to export a history into an archive. Method generates attribute files for export, sets the corresponding attributes in the jeha object, and returns a command line for running the job. The command line --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -1904,6 +1904,9 @@ class SetMetadataTool( Tool ): class ExportHistoryTool( Tool ): tool_type = 'export_history' + +class ImportHistoryTool( Tool ): + tool_type = 'import_history' # Populate tool_type to ToolClass mappings tool_types = {} --- a/lib/galaxy/tools/actions/history_imp_exp.py +++ b/lib/galaxy/tools/actions/history_imp_exp.py @@ -1,10 +1,56 @@ +import tempfile from __init__ import ToolAction from galaxy.util.odict import odict -from galaxy.tools.imp_exp import JobExportHistoryArchiveWrapper +from galaxy.tools.imp_exp import * import logging log = logging.getLogger( __name__ ) +class ImportHistoryToolAction( ToolAction ): + """Tool action used for importing a history to an archive. """ + + def execute( self, tool, trans, incoming = {}, set_output_hid = False, overwrite = True ): + # + # Create job. + # + job = trans.app.model.Job() + job.session_id = trans.get_galaxy_session().id + job.history_id = trans.history.id + job.tool_id = tool.id + job.user_id = trans.user.id + start_job_state = job.state #should be job.states.NEW + job.state = job.states.WAITING #we need to set job state to something other than NEW, or else when tracking jobs in db it will be picked up before we have added input / output parameters + trans.sa_session.add( job ) + trans.sa_session.flush() #ensure job.id are available + + # + # Setup job and job wrapper. + # + + # Add association for keeping track of job, history relationship. + archive_dir = tempfile.mkdtemp() + jiha = trans.app.model.JobImportHistoryArchive( job=job, archive_dir=archive_dir ) + trans.sa_session.add( jiha ) + job_wrapper = JobImportHistoryArchiveWrapper( job ) + + # + # Add parameters to job_parameter table. + # + + # Set additional parameters. + incoming[ '__DEST_DIR__' ] = jiha.archive_dir + for name, value in tool.params_to_strings( incoming, trans.app ).iteritems(): + job.add_parameter( name, value ) + + job.state = start_job_state #job inputs have been configured, restore initial job state + trans.sa_session.flush() + + # Queue the job for execution + trans.app.job_queue.put( job.id, tool ) + trans.log_event( "Added import history job to the job queue, id: %s" % str(job.id), tool_id=job.tool_id ) + + return job, odict() + class ExportHistoryToolAction( ToolAction ): """Tool action used for exporting a history to an archive. """ @@ -30,6 +76,7 @@ class ExportHistoryToolAction( ToolActio job.session_id = trans.get_galaxy_session().id job.history_id = trans.history.id job.tool_id = tool.id + job.user_id = trans.user.id start_job_state = job.state #should be job.states.NEW job.state = job.states.WAITING #we need to set job state to something other than NEW, or else when tracking jobs in db it will be picked up before we have added input / output parameters trans.sa_session.add( job ) --- /dev/null +++ b/lib/galaxy/tools/imp_exp/imp_history_from_archive.xml @@ -0,0 +1,10 @@ +<tool id="__IMPORT_HISTORY__" name="Import History" version="0.1" tool_type="import_history"> + <type class="ImportHistoryTool" module="galaxy.tools"/> + <action module="galaxy.tools.actions.history_imp_exp" class="ImportHistoryToolAction"/> + <command interpreter="python">unpack_tar_gz_archive.py $__ARCHIVE_SOURCE__ $__DEST_DIR__ --$__ARCHIVE_TYPE__</command> + <inputs> + <param name="__ARCHIVE_SOURCE__" type="text"/> + <param name="__ARCHIVE_TYPE__" type="text"/> + <param name="__DEST_DIR__" type="text"/> + </inputs> +</tool> --- a/templates/root/index.mako +++ b/templates/root/index.mako @@ -53,10 +53,10 @@ galaxy_main.location = "${h.url_for( controller='history', action='delete_current' )}"; } }, - ##"Other Actions": null, - ##"Import from File": function() { - ## galaxy_main.location = "${h.url_for( controller='history', action='import_archive' )}"; - ##} + "Other Actions": null, + "Import from File": function() { + galaxy_main.location = "${h.url_for( controller='history', action='import_archive' )}"; + } }); // Init tool options. --- a/lib/galaxy/model/mapping.py +++ b/lib/galaxy/model/mapping.py @@ -77,6 +77,7 @@ History.table = Table( "history", metada Column( "hid_counter", Integer, default=1 ), Column( "deleted", Boolean, index=True, default=False ), Column( "purged", Boolean, index=True, default=False ), + Column( "importing", Boolean, index=True, default=False ), Column( "genome_build", TrimmedString( 40 ) ), Column( "importable", Boolean, default=False ), Column( "slug", TEXT, index=True ), @@ -384,6 +385,13 @@ JobExportHistoryArchive.table = Table( " Column( "jobs_attrs_filename", TEXT ) ) +JobImportHistoryArchive.table = Table( "job_import_history_archive", metadata, + Column( "id", Integer, primary_key=True ), + Column( "job_id", Integer, ForeignKey( "job.id" ), index=True ), + Column( "history_id", Integer, ForeignKey( "history.id" ), index=True ), + Column( "archive_dir", TEXT ) + ) + PostJobAction.table = Table("post_job_action", metadata, Column("id", Integer, primary_key=True), Column("workflow_step_id", Integer, ForeignKey( "workflow_step.id" ), index=True, nullable=False), @@ -1220,6 +1228,9 @@ assign_mapper( context, JobExportHistory properties=dict( job = relation( Job ), history = relation( History ), dataset = relation( Dataset ) ) ) + +assign_mapper( context, JobImportHistoryArchive, JobImportHistoryArchive.table, + properties=dict( job = relation( Job ), history = relation( History ) ) ) assign_mapper( context, PostJobAction, PostJobAction.table, properties=dict(workflow_step = relation( WorkflowStep, backref='post_job_actions', primaryjoin=(WorkflowStep.table.c.id == PostJobAction.table.c.workflow_step_id)))) --- /dev/null +++ b/lib/galaxy/model/migrate/versions/0060_history_archive_import.py @@ -0,0 +1,72 @@ +""" +Migration script to create column and table for importing histories from +file archives. +""" + +from sqlalchemy import * +from sqlalchemy.orm import * +from migrate import * +from migrate.changeset import * + +import logging +log = logging.getLogger( __name__ ) + +metadata = MetaData( migrate_engine ) +db_session = scoped_session( sessionmaker( bind=migrate_engine, autoflush=False, autocommit=True ) ) + +# Columns to add. + +importing_col = Column( "importing", Boolean, index=True, default=False ) +ldda_parent_col = Column( "ldda_parent_id", Integer, ForeignKey( "library_dataset_dataset_association.id" ), index=True ) + +# Table to add. + +JobImportHistoryArchive_table = Table( "job_import_history_archive", metadata, + Column( "id", Integer, primary_key=True ), + Column( "job_id", Integer, ForeignKey( "job.id" ), index=True ), + Column( "history_id", Integer, ForeignKey( "history.id" ), index=True ), + Column( "archive_dir", TEXT ) + ) + +def upgrade(): + print __doc__ + metadata.reflect() + + # Add column to history table and initialize. + try: + History_table = Table( "history", metadata, autoload=True ) + importing_col.create( History_table ) + assert importing_col is History_table.c.importing + + # Initialize column to false. + if migrate_engine.name == 'mysql' or migrate_engine.name == 'sqlite': + default_false = "0" + elif migrate_engine.name == 'postgres': + default_false = "false" + db_session.execute( "UPDATE history SET importing=%s" % default_false ) + except Exception, e: + print str(e) + log.debug( "Adding column 'importing' to history table failed: %s" % str( e ) ) + + # Create job_import_history_archive table. + try: + JobImportHistoryArchive_table.create() + except Exception, e: + log.debug( "Creating job_import_history_archive table failed: %s" % str( e ) ) + +def downgrade(): + metadata.reflect() + + # Drop 'importing' column from history table. + try: + History_table = Table( "history", metadata, autoload=True ) + importing_col = History_table.c.importing + importing_col.drop() + except Exception, e: + log.debug( "Dropping column 'importing' from history table failed: %s" % ( str( e ) ) ) + + # Drop job_import_history_archive table. + try: + JobImportHistoryArchive_table.drop() + except Exception, e: + log.debug( "Dropping job_import_history_archive table failed: %s" % str( e ) ) --- /dev/null +++ b/lib/galaxy/tools/imp_exp/unpack_tar_gz_archive.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python +""" +Unpack a tar or tar.gz archive into a directory. + +usage: %prog archive_source dest_dir + --[url|file] source type, either a URL or a file. +""" + +import sys, optparse, tarfile, tempfile, urllib2, math + +# Set max size of archive/file that will be handled to be 100 GB. This is +# arbitrary and should be adjusted as needed. +MAX_SIZE = 100 * math.pow( 2, 30 ) + +def url_to_file( url, dest_file ): + """ + Transfer a file from a remote URL to a temporary file. + """ + try: + url_reader = urllib2.urlopen( url ) + CHUNK = 10 * 1024 # 10k + total = 0 + fp = open( dest_file, 'wb') + while True: + chunk = url_reader.read( CHUNK ) + if not chunk: + break + fp.write( chunk ) + total += CHUNK + if total > MAX_SIZE: + break + fp.close() + return dest_file + except Exception, e: + print "Exception getting file from URL: %s" % e, sys.stderr + return None + +def unpack_archive( archive_file, dest_dir ): + """ + Unpack a tar and/or gzipped archive into a destination directory. + """ + archive_fp = tarfile.open( archive_file, mode='r:gz' ) + archive_fp.extractall( path=dest_dir ) + archive_fp.close() + +if __name__ == "__main__": + # Parse command line. + parser = optparse.OptionParser() + parser.add_option( '-U', '--url', dest='is_url', action="store_true", help='Source is a URL.' ) + parser.add_option( '-F', '--file', dest='is_file', action="store_true", help='Source is a URL.' ) + (options, args) = parser.parse_args() + is_url = bool( options.is_url ) + is_file = bool( options.is_file ) + archive_source, dest_dir = args + + try: + # Get archive from URL. + if is_url: + archive_file = url_to_file( archive_source, tempfile.NamedTemporaryFile( dir=dest_dir ).name ) + elif is_file: + archive_file = archive_source + + # Unpack archive. + unpack_archive( archive_file, dest_dir ) + except Exception, e: + print "Error unpacking tar/gz archive: %s" % e, sys.stderr
participants (1)
-
commits-noreply@bitbucket.org