[hg] galaxy 3025: Fixes, cleanup and new functional tests for da...
details: http://www.bx.psu.edu/hg/galaxy/rev/23c1b9799bce changeset: 3025:23c1b9799bce user: Greg Von Kuster <greg@bx.psu.edu> date: Fri Nov 13 14:13:03 2009 -0500 description: Fixes, cleanup and new functional tests for data types and upload: - moved all supported binary data types to the new binary.py - changed GeneTrack data type to subclass from Text rather than Binary - added Sff data type to datatypes_conf.xml.sample - merged test_sniffinad_and_metadata_settings.py test scritp into test_get_data.py - added several additional functional test for data types to test_get_data.py - fixed some bugs in upload.py when uploading binary data types diffstat: datatypes_conf.xml.sample | 12 +- lib/galaxy/datatypes/binary.py | 156 +++++ lib/galaxy/datatypes/data.py | 88 +-- lib/galaxy/datatypes/genetics.py | 69 +-- lib/galaxy/datatypes/images.py | 120 ---- lib/galaxy/datatypes/registry.py | 16 +- lib/galaxy/datatypes/tracks.py | 8 +- test/functional/test_get_data.py | 620 +++++++++++++++++++--- test/functional/test_sniffing_and_metadata_settings.py | 262 --------- tools/data_source/upload.py | 45 +- 10 files changed, 748 insertions(+), 648 deletions(-) diffs (1914 lines): diff -r 4f9b630f9976 -r 23c1b9799bce datatypes_conf.xml.sample --- a/datatypes_conf.xml.sample Fri Nov 13 10:39:15 2009 -0500 +++ b/datatypes_conf.xml.sample Fri Nov 13 14:13:03 2009 -0500 @@ -1,15 +1,15 @@ <?xml version="1.0"?> <datatypes> <registration converters_path="lib/galaxy/datatypes/converters"> - <datatype extension="ab1" type="galaxy.datatypes.images:Ab1" mimetype="application/octet-stream" display_in_upload="true"/> + <datatype extension="ab1" type="galaxy.datatypes.binary:Ab1" mimetype="application/octet-stream" display_in_upload="true"/> <datatype extension="axt" type="galaxy.datatypes.sequence:Axt" display_in_upload="true"/> - <datatype extension="bam" type="galaxy.datatypes.images:Bam" mimetype="application/octet-stream"/> + <datatype extension="bam" type="galaxy.datatypes.binary:Bam" mimetype="application/octet-stream"/> <datatype extension="bed" type="galaxy.datatypes.interval:Bed" display_in_upload="true"> <converter file="bed_to_gff_converter.xml" target_datatype="gff"/> <converter file="interval_to_coverage.xml" target_datatype="coverage"/> <converter file="bed_to_interval_index_converter.xml" target_datatype="interval_index"/> </datatype> - <datatype extension="binseq.zip" type="galaxy.datatypes.images:Binseq" mimetype="application/zip" display_in_upload="true"/> + <datatype extension="binseq.zip" type="galaxy.datatypes.binary:Binseq" mimetype="application/zip" display_in_upload="true"/> <datatype extension="len" type="galaxy.datatypes.chrominfo:ChromInfo" display_in_upload="true"> <!-- no converters yet --> </datatype> @@ -49,12 +49,13 @@ <datatype extension="qualsolid" type="galaxy.datatypes.qualityscore:QualityScoreSOLiD" display_in_upload="true"/> <datatype extension="qual454" type="galaxy.datatypes.qualityscore:QualityScore454" display_in_upload="true"/> <datatype extension="sam" type="galaxy.datatypes.tabular:Sam" display_in_upload="true"/> - <datatype extension="scf" type="galaxy.datatypes.images:Scf" mimetype="application/octet-stream" display_in_upload="true"/> + <datatype extension="scf" type="galaxy.datatypes.binary:Scf" mimetype="application/octet-stream" display_in_upload="true"/> + <datatype extension="sff" type="galaxy.datatypes.binary:Sff" mimetype="application/octet-stream" display_in_upload="true"/> <datatype extension="taxonomy" type="galaxy.datatypes.tabular:Taxonomy" display_in_upload="true"/> <datatype extension="tabular" type="galaxy.datatypes.tabular:Tabular" display_in_upload="true"/> <datatype extension="txt" type="galaxy.datatypes.data:Text" display_in_upload="true"/> <datatype extension="blastxml" type="galaxy.datatypes.xml:BlastXml" display_in_upload="true"/> - <datatype extension="txtseq.zip" type="galaxy.datatypes.images:Txtseq" mimetype="application/zip" display_in_upload="true"/> + <datatype extension="txtseq.zip" type="galaxy.datatypes.data:Txtseq" mimetype="application/zip" display_in_upload="true"/> <datatype extension="wig" type="galaxy.datatypes.interval:Wiggle" display_in_upload="true"> <converter file="wiggle_to_array_tree_converter.xml" target_datatype="array_tree"/> </datatype> @@ -190,6 +191,7 @@ defined format first, followed by next-most rigidly defined, and so on. --> + <sniffer type="galaxy.datatypes.binary:Sff"/> <sniffer type="galaxy.datatypes.xml:BlastXml"/> <sniffer type="galaxy.datatypes.sequence:Maf"/> <sniffer type="galaxy.datatypes.sequence:Lav"/> diff -r 4f9b630f9976 -r 23c1b9799bce lib/galaxy/datatypes/binary.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/galaxy/datatypes/binary.py Fri Nov 13 14:13:03 2009 -0500 @@ -0,0 +1,156 @@ +""" +Binary classes +""" + +import data, logging, binascii +from galaxy.datatypes.metadata import MetadataElement +from galaxy.datatypes import metadata +from galaxy.datatypes.sniff import * +from urllib import urlencode, quote_plus +import zipfile +import os, subprocess, tempfile + +log = logging.getLogger(__name__) + +sniffable_binary_formats = [ 'sff' ] +# Currently these supported binary data types must be manually set on upload +unsniffable_binary_formats = [ 'ab1', 'scf' ] + +class Binary( data.Data ): + """Binary data""" + def set_peek( self, dataset ): + """Set the peek and blurb text""" + if not dataset.dataset.purged: + dataset.peek = 'binary data' + dataset.blurb = 'data' + else: + dataset.peek = 'file does not exist' + dataset.blurb = 'file purged from disk' + +class Ab1( Binary ): + """Class describing an ab1 binary sequence file""" + file_ext = "ab1" + def set_peek( self, dataset ): + if not dataset.dataset.purged: + export_url = "/history_add_to?" + urlencode( {'history_id':dataset.history_id,'ext':'ab1','name':'ab1 sequence','info':'Sequence file','dbkey':dataset.dbkey} ) + dataset.peek = "Binary ab1 sequence file" + dataset.blurb = data.nice_size( dataset.get_size() ) + else: + dataset.peek = 'file does not exist' + dataset.blurb = 'file purged from disk' + def display_peek( self, dataset ): + try: + return dataset.peek + except: + return "Binary ab1 sequence file (%s)" % ( data.nice_size( dataset.get_size() ) ) + +class Bam( Binary ): + """Class describing a BAM binary file""" + file_ext = "bam" + MetadataElement( name="bam_index", desc="BAM Index File", param=metadata.FileParameter, readonly=True, no_value=None, visible=False, optional=True ) + def init_meta( self, dataset, copy_from=None ): + Binary.init_meta( self, dataset, copy_from=copy_from ) + def set_meta( self, dataset, overwrite = True, **kwd ): + """ + Sets index for BAM file. + """ + index_file = dataset.metadata.bam_index + if not index_file: + index_file = dataset.metadata.spec['bam_index'].param.new_file( dataset = dataset ) + tmp_dir = tempfile.gettempdir() + tmpf1 = tempfile.NamedTemporaryFile( dir=tmp_dir ) + tmpf1bai = '%s.bai' % tmpf1.name + try: + os.system( 'cd %s' % tmp_dir ) + os.system( 'cp %s %s' % ( dataset.file_name, tmpf1.name ) ) + os.system( 'samtools index %s' % tmpf1.name ) + os.system( 'cp %s %s' % ( tmpf1bai, index_file.file_name ) ) + except Exception, ex: + sys.stderr.write( 'There was a problem creating the index for the BAM file\n%s\n' + str( ex ) ) + tmpf1.close() + if os.path.exists( tmpf1bai ): + os.remove( tmpf1bai ) + dataset.metadata.bam_index = index_file + def set_peek( self, dataset ): + if not dataset.dataset.purged: + export_url = "/history_add_to?" + urlencode( {'history_id':dataset.history_id,'ext':'bam','name':'bam alignments','info':'Alignments file','dbkey':dataset.dbkey} ) + dataset.peek = "Binary bam alignments file" + dataset.blurb = data.nice_size( dataset.get_size() ) + else: + dataset.peek = 'file does not exist' + dataset.blurb = 'file purged from disk' + def display_peek( self, dataset ): + try: + return dataset.peek + except: + return "Binary bam alignments file (%s)" % ( data.nice_size( dataset.get_size() ) ) + def get_mime( self ): + """Returns the mime type of the datatype""" + return 'application/octet-stream' + +class Binseq( Binary ): + """Class describing a zip archive of binary sequence files""" + file_ext = "binseq.zip" + def set_peek( self, dataset ): + if not dataset.dataset.purged: + zip_file = zipfile.ZipFile( dataset.file_name, "r" ) + num_files = len( zip_file.namelist() ) + dataset.peek = "Archive of %s binary sequence files" % ( str( num_files ) ) + dataset.blurb = data.nice_size( dataset.get_size() ) + else: + dataset.peek = 'file does not exist' + dataset.blurb = 'file purged from disk' + def display_peek( self, dataset ): + try: + return dataset.peek + except: + return "Binary sequence file archive (%s)" % ( data.nice_size( dataset.get_size() ) ) + def get_mime( self ): + """Returns the mime type of the datatype""" + return 'application/zip' + +class Scf( Binary ): + """Class describing an scf binary sequence file""" + file_ext = "scf" + def set_peek( self, dataset ): + if not dataset.dataset.purged: + export_url = "/history_add_to?" + urlencode({'history_id':dataset.history_id,'ext':'scf','name':'scf sequence','info':'Sequence file','dbkey':dataset.dbkey}) + dataset.peek = "Binary scf sequence file" + dataset.blurb = data.nice_size( dataset.get_size() ) + else: + dataset.peek = 'file does not exist' + dataset.blurb = 'file purged from disk' + def display_peek( self, dataset ): + try: + return dataset.peek + except: + return "Binary scf sequence file (%s)" % ( data.nice_size( dataset.get_size() ) ) + +class Sff( Binary ): + """ Standard Flowgram Format (SFF) """ + file_ext = "sff" + def __init__( self, **kwd ): + Binary.__init__( self, **kwd ) + def sniff( self, filename ): + # The first 4 bytes of any sff file is '.sff', and the file is binary. For details + # about the format, see http://www.ncbi.nlm.nih.gov/Traces/trace.cgi?cmd=show&f=formats&m=doc&s=format + try: + header = open( filename ).read(4) + if binascii.b2a_hex( header ) == binascii.hexlify( '.sff' ): + return True + return False + except Exception, e: + return False + def set_peek( self, dataset ): + if not dataset.dataset.purged: + export_url = "/history_add_to?" + urlencode( {'history_id':dataset.history_id,'ext':'sff','name':'sff file','info':'sff file','dbkey':dataset.dbkey} ) + dataset.peek = "Binary sff file" + dataset.blurb = data.nice_size( dataset.get_size() ) + else: + dataset.peek = 'file does not exist' + dataset.blurb = 'file purged from disk' + def display_peek( self, dataset ): + try: + return dataset.peek + except: + return "Binary sff file (%s)" % ( data.nice_size( dataset.get_size() ) ) diff -r 4f9b630f9976 -r 23c1b9799bce lib/galaxy/datatypes/data.py --- a/lib/galaxy/datatypes/data.py Fri Nov 13 10:39:15 2009 -0500 +++ b/lib/galaxy/datatypes/data.py Fri Nov 13 14:13:03 2009 -0500 @@ -1,4 +1,4 @@ -import logging, os, sys, time, tempfile, binascii +import logging, os, sys, time, tempfile from galaxy import util from galaxy.util.odict import odict from galaxy.util.bunch import Bunch @@ -40,20 +40,18 @@ """ __metaclass__ = DataMeta - - """Add metadata elements""" + # Add metadata elements MetadataElement( name="dbkey", desc="Database/Build", default="?", param=metadata.DBKeyParameter, multiple=False, no_value="?" ) - - """Stores the set of display applications, and viewing methods, supported by this datatype """ + # Stores the set of display applications, and viewing methods, supported by this datatype supported_display_apps = {} - - """If False, the peek is regenerated whenever a dataset of this type is copied""" + # If False, the peek is regenerated whenever a dataset of this type is copied copy_safe_peek = True - - is_binary = True #The dataset contains binary data --> do not space_to_tab or convert newlines, etc. Allow binary file uploads of this type when True. - - allow_datatype_change = True #Allow user to change between this datatype and others. If False, this datatype cannot be changed from or into. - + # The dataset contains binary data --> do not space_to_tab or convert newlines, etc. + # Allow binary file uploads of this type when True. + is_binary = True + # Allow user to change between this datatype and others. If False, this datatype + # cannot be changed from or into. + allow_datatype_change = True #Composite datatypes composite_type = None composite_files = odict() @@ -270,8 +268,6 @@ def add_composite_file( self, name, **kwds ): #self.composite_files = self.composite_files.copy() self.composite_files[ name ] = self.__new_composite_file( name, **kwds ) - - def __substitute_composite_key( self, key, composite_file, dataset = None ): if composite_file.substitute_name_with_metadata: if dataset: @@ -303,7 +299,6 @@ return files def generate_auto_primary_file( self, dataset = None ): raise Exception( "generate_auto_primary_file is not implemented for this datatype." ) - @property def has_resolution(self): return False @@ -364,23 +359,37 @@ dataset.peek = 'file does not exist' dataset.blurb = 'file purged from disk' -class Binary( Data ): - """Binary data""" +class Txtseq( Data ): + """Class describing a zip archive of text sequence files""" + file_ext = "txtseq.zip" def set_peek( self, dataset ): - """Set the peek and blurb text""" if not dataset.dataset.purged: - dataset.peek = 'binary data' - dataset.blurb = 'data' + zip_file = zipfile.ZipFile( dataset.file_name, "r" ) + num_files = len( zip_file.namelist() ) + dataset.peek = "Archive of %s text sequence files" % ( str( num_files ) ) + dataset.blurb = data.nice_size( dataset.get_size() ) else: dataset.peek = 'file does not exist' dataset.blurb = 'file purged from disk' + def display_peek(self, dataset): + try: + return dataset.peek + except: + return "Text sequence file archive (%s)" % ( data.nice_size( dataset.get_size() ) ) + def get_mime(self): + """Returns the mime type of the datatype""" + return 'application/zip' + +class Newick( Text ): + pass + +# ------------- Utility methods -------------- def get_test_fname( fname ): """Returns test data filename""" path, name = os.path.split(__file__) full_path = os.path.join( path, 'test', fname ) return full_path - def nice_size(size): """ Returns a readably formatted string with the size @@ -406,7 +415,6 @@ out = "%.1f %s" % (size, word) return out return '??? bytes' - def get_file_peek( file_name, is_multi_byte=False, WIDTH=256, LINE_COUNT=5 ): """ Returns the first LINE_COUNT lines wrapped to WIDTH @@ -443,7 +451,6 @@ else: text = unicode( '\n'.join( lines ), 'utf-8' ) return text - def get_line_count(file_name): """Returns the number of lines in a file that are neither null nor comments""" count = 0 @@ -452,38 +459,3 @@ if line and line[0] != '#': count += 1 return count - -class Newick( Text ): - pass - -class Sff( Binary ): - """ Standard Flowgram Format (SFF) """ - file_ext = "sff" - def __init__( self, **kwd ): - Binary.__init__(self, **kwd) - def init_meta( self, dataset, copy_from=None ): - Binary.init_meta( self, dataset, copy_from=copy_from ) - def sniff( self, filename ): - ''' - The first 4 bytes of any sff file is '.sff' - - >>> fname = get_test_fname( '1.sff' ) - >>> Sff().sniff( fname ) - True - ''' - header = open( filename ).read(4) - if binascii.b2a_hex( header ) == binascii.hexlify( '.sff' ): - return True - return False - def set_peek( self, dataset ): - if not dataset.dataset.purged: - dataset.peek = "Binary sff file" - dataset.blurb = nice_size( dataset.get_size() ) - else: - dataset.peek = 'file does not exist' - dataset.blurb = 'file purged from disk' - def display_peek(self, dataset): - try: - return dataset.peek - except: - return "sff file (%s)" % ( nice_size( dataset.get_size() ) ) diff -r 4f9b630f9976 -r 23c1b9799bce lib/galaxy/datatypes/genetics.py --- a/lib/galaxy/datatypes/genetics.py Fri Nov 13 10:39:15 2009 -0500 +++ b/lib/galaxy/datatypes/genetics.py Fri Nov 13 14:13:03 2009 -0500 @@ -48,10 +48,8 @@ """Initialize datatype, by adding GBrowse display app""" Interval.__init__(self, **kwd) self.add_display_app ( 'ucsc', 'display at UCSC', 'as_ucsc_display_file', 'ucsc_links' ) - def as_ucsc_display_file( self, dataset, **kwd ): return open( dataset.file_name ) - def set_meta( self, dataset, overwrite = True, **kwd ): i = 0 for i, line in enumerate( file ( dataset.file_name ) ): @@ -66,7 +64,6 @@ except: pass Interval.set_meta( self, dataset, overwrite = overwrite, skip = i ) - def make_html_table( self, dataset, skipchars=[] ): """Create HTML table, used for displaying peek""" out = ['<table cellspacing="0" cellpadding="3">'] @@ -82,7 +79,6 @@ except Exception, exc: out = "Can't create peek %s" % exc return out - def get_estimated_display_viewport( self, dataset ): """ Return a chrom, start, stop tuple for viewing a file. There are slight differences between gff 2 and gff 3 @@ -118,7 +114,6 @@ return ( seqid, str( start ), str( stop ) ) else: return ( '', '', '' ) - def gbrowse_links( self, dataset, type, app, base_url ): ret_val = [] if dataset.has_data: @@ -132,7 +127,6 @@ link = "%s?start=%s&stop=%s&ref=%s&dbkey=%s" % ( site_url, start, stop, seqid, dataset.dbkey ) ret_val.append( ( site_name, link ) ) return ret_val - def ucsc_links( self, dataset, type, app, base_url ): ret_val = [] if dataset.has_data: @@ -162,8 +156,6 @@ else: gal_Log.debug('@@@ gg ucsc_links - no viewport_tuple') return ret_val - - def sniff( self, filename ): """ Determines whether the file is in gff format @@ -202,20 +194,16 @@ except: return False - - class rgTabList(Tabular): """ for sampleid and for featureid lists of exclusions or inclusions in the clean tool featureid subsets on statistical criteria -> specialized display such as gg """ file_ext = "rgTList" - def __init__(self, **kwd): """Initialize featurelistt datatype""" Tabular.__init__( self, **kwd ) self.column_names = [] - def make_html_table( self, dataset, skipchars=[] ): """Create HTML table, used for displaying peek""" out = ['<table cellspacing="0" cellpadding="3">'] @@ -236,7 +224,6 @@ out = "Can't create peek %s" % exc return out - class rgSampleList(rgTabList): """ for sampleid exclusions or inclusions in the clean tool output from QC eg excess het, gender error, ibd pair member,eigen outlier,excess mendel errors,... @@ -252,7 +239,6 @@ self.column_names[0] = 'FID' self.column_names[1] = 'IID' # this is what Plink wants as at 2009 - def sniff(self,filename): """ """ @@ -276,26 +262,22 @@ rgTabList.__init__( self, **kwd ) for i,s in enumerate(['#FeatureId', 'Chr', 'Genpos', 'Mappos']): self.column_names[i] = s - class Rgenetics(Html): """class to use for rgenetics""" - MetadataElement( name="base_name", desc="base name for all transformed versions of this genetic dataset", default="rgenetics", - readonly=True, set_in_upload=True) + MetadataElement( name="base_name", desc="base name for all transformed versions of this genetic dataset", default="rgenetics", readonly=True, set_in_upload=True) composite_type = 'auto_primary_file' allow_datatype_change = False file_ext = 'rgenetics' - def missing_meta( self, dataset=None, **kwargs): """Checks for empty meta values""" for key, value in dataset.metadata.items(): if not value: return True return False - def generate_primary_file( self, dataset = None ): rval = ['<html><head><title>Rgenetics Galaxy Composite Dataset </title></head><p/>'] rval.append('<div>This composite dataset is composed of the following files:<p/><ul>') @@ -306,7 +288,6 @@ rval.append( '<li><a href="%s" type="application/binary">%s</a>%s' % ( composite_name, composite_name, opt_text ) ) rval.append( '</ul></div></html>' ) return "\n".join( rval ) - def regenerate_primary_file(self,dataset): """cannot do this until we are setting metadata """ @@ -332,12 +313,8 @@ f.write("\n".join( rval )) f.write('\n') f.close() - def set_meta( self, dataset, **kwd ): - - """for lped/pbed eg - - """ + """for lped/pbed eg""" if kwd.get('overwrite') == False: if verbose: gal_Log.debug('@@@ rgenetics set_meta called with overwrite = False') @@ -349,9 +326,10 @@ gal_Log.debug('@@@rgenetics set_meta failed %s - dataset %s has no efp ?' % (sys.exc_info()[0], dataset.name)) return False try: - flist = os.listdir(efp) - except: - if verbose: gal_Log.debug('@@@rgenetics set_meta failed %s - dataset %s has no efp ?' % (sys.exc_info()[0],dataset.name)) + flist = os.listdir(efp) + except: + if verbose: + gal_Log.debug('@@@rgenetics set_meta failed %s - dataset %s has no efp ?' % (sys.exc_info()[0],dataset.name)) return False if len(flist) == 0: if verbose: @@ -372,7 +350,6 @@ dataset.blurb = 'Composite file - Rgenetics Galaxy toolkit' return True - class SNPMatrix(Rgenetics): """fake class to distinguish different species of Rgenetics data collections """ @@ -385,7 +362,6 @@ else: dataset.peek = 'file does not exist' dataset.blurb = 'file purged from disk' - def sniff(self,filename): """ need to check the file header hex code """ @@ -397,7 +373,6 @@ else: return True - class Lped(Rgenetics): """fake class to distinguish different species of Rgenetics data collections """ @@ -408,7 +383,6 @@ self.add_composite_file( '%s.ped', description = 'Pedigree File', substitute_name_with_metadata = 'base_name', is_binary = True ) self.add_composite_file( '%s.map', description = 'Map File', substitute_name_with_metadata = 'base_name', is_binary = True ) - class Pphe(Rgenetics): """fake class to distinguish different species of Rgenetics data collections """ @@ -418,7 +392,6 @@ Rgenetics.__init__(self, **kwd) self.add_composite_file( '%s.pphe', description = 'Plink Phenotype File', substitute_name_with_metadata = 'base_name' ) - class Lmap(Rgenetics): """fake class to distinguish different species of Rgenetics data collections """ @@ -442,8 +415,6 @@ Rgenetics.__init__(self, **kwd) self.add_composite_file( '%s.phe', description = 'Phenotype File', substitute_name_with_metadata = 'base_name' ) - - class Fped(Rgenetics): """fake class to distinguish different species of Rgenetics data collections """ @@ -453,7 +424,6 @@ Rgenetics.__init__(self, **kwd) self.add_composite_file( '%s.fped', description = 'FBAT format pedfile', substitute_name_with_metadata = 'base_name' ) - class Pbed(Rgenetics): """fake class to distinguish different species of Rgenetics data collections """ @@ -475,8 +445,6 @@ self.add_composite_file( '%s.eigenstratgeno', substitute_name_with_metadata = 'base_name', is_binary = True ) self.add_composite_file( '%s.ind', substitute_name_with_metadata = 'base_name', is_binary = True ) self.add_composite_file( '%s.map', substitute_name_with_metadata = 'base_name', is_binary = True ) - - class Eigenstratpca(Rgenetics): """fake class to distinguish different species of Rgenetics data collections @@ -487,20 +455,17 @@ Rgenetics.__init__(self, **kwd) self.add_composite_file( '%s.eigenstratpca', description = 'Eigenstrat PCA file', substitute_name_with_metadata = 'base_name' ) - class Snptest(Rgenetics): """fake class to distinguish different species of Rgenetics data collections """ file_ext="snptest" - class Pheno(Tabular): """ base class for pheno files """ file_ext = 'pheno' - class RexpBase( Html ): """base class for BioC data structures in Galaxy must be constructed with the pheno data in place since that @@ -518,18 +483,15 @@ composite_type = 'auto_primary_file' allow_datatype_change = False - def __init__( self, **kwd ): Html.__init__(self,**kwd) self.add_composite_file( '%s.pheno', description = 'Phenodata tab text file', substitute_name_with_metadata = 'base_name', is_binary=True) - def generate_primary_file( self, dataset = None ): """ This is called only at upload to write the html file cannot rename the datasets here - they come with the default unfortunately """ return '<html><head></head><body>AutoGenerated Primary File for Composite Dataset</body></html>' - def get_phecols(self, phenolist=[], maxConc=20): """ sept 2009: cannot use whitespace to split - make a more complex structure here @@ -601,8 +563,6 @@ res = [('no usable phenotype columns found',[('?',0),]),] return res - - def get_pheno(self,dataset): """expects a .pheno file in the extra_files_dir - ugh note that R is wierd and adds the row.name in @@ -620,7 +580,6 @@ else: p = [] return '\n'.join(p) - def set_peek( self, dataset ): """expects a .pheno file in the extra_files_dir - ugh note that R is wierd and does not include the row.name in @@ -636,7 +595,6 @@ else: dataset.peek = 'file does not exist\n' dataset.blurb = 'file purged from disk' - def get_peek( self, dataset ): """expects a .pheno file in the extra_files_dir - ugh """ @@ -646,7 +604,6 @@ except: p = ['##failed to find %s' % pp] return ''.join(p[:5]) - def get_file_peek(self,filename): """ can't really peek at a filename - need the extra_files_path and such? @@ -657,7 +614,6 @@ except: pass return ''.join(h[:5]) - def regenerate_primary_file(self,dataset): """cannot do this until we are setting metadata """ @@ -672,18 +628,14 @@ f.write("\n".join( rval )) f.write('\n') f.close() - - """Add metadata elements""" def init_meta( self, dataset, copy_from=None ): + """Add metadata elements""" if copy_from: dataset.metadata = copy_from.metadata - def set_meta( self, dataset, **kwd ): - """ NOTE we apply the tabular machinary to the phenodata extracted from a BioC eSet or affybatch. - """ try: flist = os.listdir(dataset.extra_files_path) @@ -727,7 +679,6 @@ if not dataset.blurb: dataset.blurb = 'R loadable BioC expression object for the Rexpression Galaxy toolkit' return True - def make_html_table( self, pp='nothing supplied from peek\n'): """Create HTML table, used for displaying peek""" out = ['<table cellspacing="0" cellpadding="3">',] @@ -750,16 +701,13 @@ except Exception, exc: out = "Can't create html table %s" % str( exc ) return out - def display_peek( self, dataset ): """Returns formatted html of peek""" out=self.make_html_table(dataset.peek) return out - def get_mime(self): """Returns the mime type of the datatype""" return 'text/html' - class Affybatch( RexpBase ): """derived class for BioC data structures in Galaxy """ @@ -790,9 +738,6 @@ self.add_composite_file( '%s.malist', description = 'MAlist R object saved to file', substitute_name_with_metadata = 'base_name', is_binary = True ) - if __name__ == '__main__': import doctest, sys doctest.testmod(sys.modules[__name__]) - - diff -r 4f9b630f9976 -r 23c1b9799bce lib/galaxy/datatypes/images.py --- a/lib/galaxy/datatypes/images.py Fri Nov 13 10:39:15 2009 -0500 +++ b/lib/galaxy/datatypes/images.py Fri Nov 13 14:13:03 2009 -0500 @@ -13,82 +13,6 @@ log = logging.getLogger(__name__) -class Ab1( data.Data ): - """Class describing an ab1 binary sequence file""" - file_ext = "ab1" - def set_peek( self, dataset ): - if not dataset.dataset.purged: - export_url = "/history_add_to?" + urlencode({'history_id':dataset.history_id,'ext':'ab1','name':'ab1 sequence','info':'Sequence file','dbkey':dataset.dbkey}) - dataset.peek = "Binary ab1 sequence file" - dataset.blurb = data.nice_size( dataset.get_size() ) - else: - dataset.peek = 'file does not exist' - dataset.blurb = 'file purged from disk' - def display_peek(self, dataset): - try: - return dataset.peek - except: - return "Binary ab1 sequence file (%s)" % ( data.nice_size( dataset.get_size() ) ) - -class Scf( data.Data ): - """Class describing an scf binary sequence file""" - file_ext = "scf" - def set_peek( self, dataset ): - if not dataset.dataset.purged: - export_url = "/history_add_to?" + urlencode({'history_id':dataset.history_id,'ext':'scf','name':'scf sequence','info':'Sequence file','dbkey':dataset.dbkey}) - dataset.peek = "Binary scf sequence file" - dataset.blurb = data.nice_size( dataset.get_size() ) - else: - dataset.peek = 'file does not exist' - dataset.blurb = 'file purged from disk' - def display_peek(self, dataset): - try: - return dataset.peek - except: - return "Binary scf sequence file (%s)" % ( data.nice_size( dataset.get_size() ) ) - -class Binseq( data.Data ): - """Class describing a zip archive of binary sequence files""" - file_ext = "binseq.zip" - def set_peek( self, dataset ): - if not dataset.dataset.purged: - zip_file = zipfile.ZipFile( dataset.file_name, "r" ) - num_files = len( zip_file.namelist() ) - dataset.peek = "Archive of %s binary sequence files" % ( str( num_files ) ) - dataset.blurb = data.nice_size( dataset.get_size() ) - else: - dataset.peek = 'file does not exist' - dataset.blurb = 'file purged from disk' - def display_peek(self, dataset): - try: - return dataset.peek - except: - return "Binary sequence file archive (%s)" % ( data.nice_size( dataset.get_size() ) ) - def get_mime(self): - """Returns the mime type of the datatype""" - return 'application/zip' - -class Txtseq( data.Data ): - """Class describing a zip archive of text sequence files""" - file_ext = "txtseq.zip" - def set_peek( self, dataset ): - if not dataset.dataset.purged: - zip_file = zipfile.ZipFile( dataset.file_name, "r" ) - num_files = len( zip_file.namelist() ) - dataset.peek = "Archive of %s text sequence files" % ( str( num_files ) ) - dataset.blurb = data.nice_size( dataset.get_size() ) - else: - dataset.peek = 'file does not exist' - dataset.blurb = 'file purged from disk' - def display_peek(self, dataset): - try: - return dataset.peek - except: - return "Text sequence file archive (%s)" % ( data.nice_size( dataset.get_size() ) ) - def get_mime(self): - """Returns the mime type of the datatype""" - return 'application/zip' - class Image( data.Data ): """Class describing an image""" def set_peek( self, dataset ): @@ -236,47 +160,3 @@ return dataset.peek except: return "peek unavailable" - -class Bam( data.Binary ): - """Class describing a BAM binary file""" - file_ext = "bam" - MetadataElement( name="bam_index", desc="BAM Index File", param=metadata.FileParameter, readonly=True, no_value=None, visible=False, optional=True ) - def init_meta( self, dataset, copy_from=None ): - data.Binary.init_meta( self, dataset, copy_from=copy_from ) - def set_meta( self, dataset, overwrite = True, **kwd ): - """ - Sets index for BAM file. - """ - index_file = dataset.metadata.bam_index - if not index_file: - index_file = dataset.metadata.spec['bam_index'].param.new_file( dataset = dataset ) - tmp_dir = tempfile.gettempdir() - tmpf1 = tempfile.NamedTemporaryFile(dir=tmp_dir) - tmpf1bai = '%s.bai' % tmpf1.name - try: - os.system('cd %s' % tmp_dir) - os.system('cp %s %s' % (dataset.file_name, tmpf1.name)) - os.system('samtools index %s' % tmpf1.name) - os.system('cp %s %s' % (tmpf1bai, index_file.file_name)) - except Exception, ex: - sys.stderr.write('There was a problem creating the index for the BAM file\n%s\n' + str(ex)) - tmpf1.close() - if os.path.exists(tmpf1bai): - os.remove(tmpf1bai) - dataset.metadata.bam_index = index_file - def set_peek( self, dataset ): - if not dataset.dataset.purged: - export_url = "/history_add_to?" + urlencode({'history_id':dataset.history_id,'ext':'bam','name':'bam alignments','info':'Alignments file','dbkey':dataset.dbkey}) - dataset.peek = "Binary bam alignments file" - dataset.blurb = data.nice_size( dataset.get_size() ) - else: - dataset.peek = 'file does not exist' - dataset.blurb = 'file purged from disk' - def display_peek(self, dataset): - try: - return dataset.peek - except: - return "Binary bam alignments file (%s)" % ( data.nice_size( dataset.get_size() ) ) - def get_mime(self): - """Returns the mime type of the datatype""" - return 'application/octet-stream' diff -r 4f9b630f9976 -r 23c1b9799bce lib/galaxy/datatypes/registry.py --- a/lib/galaxy/datatypes/registry.py Fri Nov 13 10:39:15 2009 -0500 +++ b/lib/galaxy/datatypes/registry.py Fri Nov 13 14:13:03 2009 -0500 @@ -3,7 +3,7 @@ """ import os, tempfile import logging -import data, tabular, interval, images, sequence, qualityscore, genetics, xml, coverage, tracks, chrominfo +import data, tabular, interval, images, sequence, qualityscore, genetics, xml, coverage, tracks, chrominfo, binary import galaxy.util from galaxy.util.odict import odict @@ -109,11 +109,11 @@ #default values if len(self.datatypes_by_extension) < 1: self.datatypes_by_extension = { - 'ab1' : images.Ab1(), + 'ab1' : binary.Ab1(), 'axt' : sequence.Axt(), - 'bam' : images.Bam(), + 'bam' : binary.Bam(), 'bed' : interval.Bed(), - 'binseq.zip' : images.Binseq(), + 'binseq.zip' : binary.Binseq(), 'blastxml' : xml.BlastXml(), 'coverage' : coverage.LastzCoverage(), 'customtrack' : interval.CustomTrack(), @@ -132,12 +132,12 @@ 'qualsolexa' : qualityscore.QualityScoreSolexa(), 'qual454' : qualityscore.QualityScore454(), 'sam' : tabular.Sam(), - 'scf' : images.Scf(), - 'sff' : data.Sff(), + 'scf' : binary.Scf(), + 'sff' : binary.Sff(), 'tabular' : tabular.Tabular(), 'taxonomy' : tabular.Taxonomy(), 'txt' : data.Text(), - 'txtseq.zip' : images.Txtseq(), + 'txtseq.zip' : data.Txtseq(), 'wig' : interval.Wiggle() } self.mimetypes_by_extension = { @@ -174,7 +174,7 @@ # because some formats are much more flexibly defined than others. if len(self.sniff_order) < 1: self.sniff_order = [ - data.Sff(), + binary.Sff(), xml.BlastXml(), sequence.Maf(), sequence.Lav(), diff -r 4f9b630f9976 -r 23c1b9799bce lib/galaxy/datatypes/tracks.py --- a/lib/galaxy/datatypes/tracks.py Fri Nov 13 10:39:15 2009 -0500 +++ b/lib/galaxy/datatypes/tracks.py Fri Nov 13 14:13:03 2009 -0500 @@ -2,11 +2,7 @@ Datatype classes for tracks/track views within galaxy. """ -import data -import logging -import re -import binascii -from cgi import escape +import tabular, binascii, logging from galaxy.datatypes.metadata import MetadataElement from galaxy.datatypes import metadata import galaxy.model @@ -17,7 +13,7 @@ log = logging.getLogger(__name__) -class GeneTrack( data.Binary ): +class GeneTrack( tabular.Tabular ): file_ext = "genetrack" MetadataElement( name="genetrack", default="data.genetrack", desc="HDF index", readonly=True, visible=True, no_value=0 ) diff -r 4f9b630f9976 -r 23c1b9799bce test/functional/test_get_data.py --- a/test/functional/test_get_data.py Fri Nov 13 10:39:15 2009 -0500 +++ b/test/functional/test_get_data.py Fri Nov 13 14:13:03 2009 -0500 @@ -4,128 +4,538 @@ from base.twilltestcase import TwillTestCase class UploadData( TwillTestCase ): - def test_000_upload_files_from_disk( self ): - """Test uploading data files from disk""" + def test_0005_upload_file( self ): + """Test uploading 1.bed, NOT setting the file format""" self.logout() self.login( email='test@bx.psu.edu' ) global admin_user admin_user = sa_session.query( galaxy.model.User ) \ .filter( galaxy.model.User.table.c.email=='test@bx.psu.edu' ) \ .one() - history1 = sa_session.query( galaxy.model.History ) \ - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==admin_user.id ) ) \ - .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ - .first() + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() self.upload_file( '1.bed' ) - hda1 = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert hda1 is not None, "Problem retrieving hda1 from database" - self.verify_dataset_correctness( '1.bed', hid=str( hda1.hid ) ) - self.upload_file( '2.bed', dbkey='hg17' ) - hda2 = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert hda2 is not None, "Problem retrieving hda2 from database" - self.verify_dataset_correctness( '2.bed', hid=str( hda2.hid ) ) - self.upload_file( '3.bed', dbkey='hg17', ftype='bed' ) - hda3 = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert hda3 is not None, "Problem retrieving hda3 from database" - self.verify_dataset_correctness( '3.bed', hid=str( hda3.hid ) ) + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.verify_dataset_correctness( '1.bed', hid=str( hda.hid ) ) + self.check_history_for_string( "<th>1.Chrom</th><th>2.Start</th><th>3.End</th>" ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0010_upload_file( self ): + """Test uploading 4.bed.gz, manually setting the file format""" + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() self.upload_file( '4.bed.gz', dbkey='hg17', ftype='bed' ) - hda4 = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert hda4 is not None, "Problem retrieving hda4 from database" - self.verify_dataset_correctness( '4.bed', hid=str( hda4.hid ) ) + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.verify_dataset_correctness( '4.bed', hid=str( hda.hid ) ) + self.check_history_for_string( "<th>1.Chrom</th><th>2.Start</th><th>3.End</th>" ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0015_upload_file( self ): + """Test uploading 1.scf, manually setting the file format""" + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() self.upload_file( '1.scf', ftype='scf' ) - hda5 = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert hda5 is not None, "Problem retrieving hda5 from database" - self.verify_dataset_correctness( '1.scf', hid=str( hda5.hid ) ) + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.verify_dataset_correctness( '1.scf', hid=str( hda.hid ) ) + self.check_history_for_string( "Binary scf sequence file</pre>" ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0020_upload_file( self ): + """Test uploading 1.scf, NOT setting the file format""" + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() + self.upload_file( '1.scf' ) + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.check_history_for_string( "File Format' to 'Scf' when uploading scf files" ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0025_upload_file( self ): + """Test uploading 1.scf.zip, manually setting the file format""" + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() self.upload_file( '1.scf.zip', ftype='binseq.zip' ) - hda6 = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert hda6 is not None, "Problem retrieving hda6 from database" - self.verify_dataset_correctness( '1.scf.zip', hid=str( hda6.hid ) ) - self.delete_history( id=self.security.encode_id( history1.id ) ) - def test_005_url_paste( self ): + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.verify_dataset_correctness( '1.scf.zip', hid=str( hda.hid ) ) + self.check_history_for_string( "Archive of 1 binary sequence files</pre>" ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0030_upload_file( self ): + """Test uploading 1.scf.zip, NOT setting the file format""" + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() + self.upload_file( '1.scf.zip' ) + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.check_history_for_string( "'File Format' for archive consisting of binary files - use 'Binseq.zip'" ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0035_upload_file( self ): + """Test uploading 1.sam NOT setting the file format""" + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() + self.upload_file( '1.sam' ) + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.verify_dataset_correctness( '1.sam', hid=str( hda.hid ) ) + self.check_history_for_string( "<th>1.QNAME</th><th>2.FLAG</th><th>3.RNAME</th><th>4.POS</th>" ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0040_upload_file( self ): + """Test uploading 1.sff, NOT setting the file format""" + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() + self.upload_file( '1.sff' ) + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.verify_dataset_correctness( '1.sff', hid=str( hda.hid ) ) + self.check_history_for_string( 'format: <span class="sff">sff' ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0045_upload_file( self ): + """Test uploading 454Score.pdf, NOT setting the file format""" + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() + self.upload_file( '454Score.pdf' ) + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.check_history_for_string( "The uploaded file contains inappropriate content" ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0050_upload_file( self ): + """Test uploading 454Score.png, NOT setting the file format""" + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() + self.upload_file( '454Score.png' ) + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.check_history_for_string( "The uploaded file contains inappropriate content" ) + def test_0055_upload_file( self ): + """Test uploading lped composite datatype file, manually setting the file format""" + # Logged in as admin_user + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() + # lped data types include a ped_file and a map_file ( which is binary ) + self.upload_composite_datatype_file( 'lped', ped_file='tinywga.ped', map_file='tinywga.map', base_name='rgenetics' ) + # Get the latest hid for testing + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + # We'll test against the resulting ped file and map file for correctness + self.verify_composite_datatype_file_content( 'rgenetics.ped', str( hda.id ) ) + self.verify_composite_datatype_file_content( 'rgenetics.map', str( hda.id ) ) + self.check_history_for_string( "Uploaded Composite Dataset (lped)" ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0060_upload_file( self ): + """Test uploading pbed composite datatype file, manually setting the file format""" + # Logged in as admin_user + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() + # pbed data types include a bim_file, a bed_file and a fam_file + self.upload_composite_datatype_file( 'pbed', bim_file='tinywga.bim', bed_file='tinywga.bed', fam_file='tinywga.fam', base_name='rgenetics' ) + # Get the latest hid for testing + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + # We'll test against the resulting ped file and map file for correctness + self.verify_composite_datatype_file_content( 'rgenetics.bim', str( hda.id ) ) + self.verify_composite_datatype_file_content( 'rgenetics.bed', str( hda.id ) ) + self.verify_composite_datatype_file_content( 'rgenetics.fam', str( hda.id ) ) + self.check_history_for_string( "Uploaded Composite Dataset (pbed)" ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0065_upload_file( self ): + """Test uploading asian_chars_1.txt, NOT setting the file format""" + # Logged in as admin_user + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() + self.upload_file( 'asian_chars_1.txt' ) + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.verify_dataset_correctness( 'asian_chars_1.txt', hid=str( hda.hid ) ) + self.check_history_for_string( 'uploaded multi-byte char file' ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0070_upload_file( self ): + """Test uploading 2gen.fastq, NOT setting the file format""" + # Logged in as admin_user + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() + self.upload_file( '2gen.fastq' ) + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.verify_dataset_correctness( '2gen.fastq', hid=str( hda.hid ) ) + self.check_history_for_string( '2gen.fastq format: <span class="fastq">fastq</span>, database: \? Info: uploaded fastq file' ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0075_upload_file( self ): + """Test uploading 1.wig, NOT setting the file format""" + # Logged in as admin_user + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() + self.upload_file( '1.wig' ) + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.verify_dataset_correctness( '1.wig', hid=str( hda.hid ) ) + self.check_history_for_string( '1.wig format: <span class="wig">wig</span>, database: \? Info: uploaded file' ) + self.check_metadata_for_string( 'value="1.wig" value="\?"' ) + self.check_metadata_for_string( 'Change data type selected value="wig" selected="yes"' ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0080_upload_file( self ): + """Test uploading 1.tabular, NOT setting the file format""" + # Logged in as admin_user + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() + self.upload_file( '1.tabular' ) + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.verify_dataset_correctness( '1.tabular', hid=str( hda.hid ) ) + self.check_history_for_string( '1.tabular format: <span class="tabular">tabular</span>, database: \? Info: uploaded file' ) + self.check_metadata_for_string( 'value="1.tabular" value="\?"' ) + self.check_metadata_for_string( 'Change data type selected value="tabular" selected="yes"' ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0085_upload_file( self ): + """Test uploading qualscores.qualsolid, NOT setting the file format""" + # Logged in as admin_user + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() + self.upload_file( 'qualscores.qualsolid' ) + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.verify_dataset_correctness( 'qualscores.qualsolid', hid=str( hda.hid ) ) + self.check_history_for_string( '2.5 Kb, format: <span class="qualsolid">qualsolid</span>, database: \? Info: uploaded file' ) + self.check_metadata_for_string( 'Change data type value="qualsolid" selected="yes">qualsolid' ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0090_upload_file( self ): + """Test uploading qualscores.qual454, NOT setting the file format""" + # Logged in as admin_user + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() + self.upload_file( 'qualscores.qual454' ) + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.verify_dataset_correctness( 'qualscores.qual454', hid=str( hda.hid ) ) + self.check_history_for_string( '5.6 Kb, format: <span class="qual454">qual454</span>, database: \?' ) + self.check_metadata_for_string( 'Change data type value="qual454" selected="yes">qual454' ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0095_upload_file( self ): + """Test uploading 3.maf, NOT setting the file format""" + # Logged in as admin_user + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() + self.upload_file( '3.maf' ) + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.verify_dataset_correctness( '3.maf', hid=str( hda.hid ) ) + self.check_history_for_string( '3.maf format: <span class="maf">maf</span>, database: \? Info: uploaded file' ) + self.check_metadata_for_string( 'value="3.maf" value="\?"' ) + self.check_metadata_for_string( 'Convert to new format <option value="interval">Convert MAF to Genomic Intervals <option value="fasta">Convert MAF to Fasta' ) + self.check_metadata_for_string( 'Change data type selected value="maf" selected="yes"' ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0100_upload_file( self ): + """Test uploading 1.lav, NOT setting the file format""" + # Logged in as admin_user + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() + self.upload_file( '1.lav' ) + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.verify_dataset_correctness( '1.lav', hid=str( hda.hid ) ) + self.check_history_for_string( '1.lav format: <span class="lav">lav</span>, database: \? Info: uploaded file' ) + self.check_metadata_for_string( 'value="1.lav" value="\?"' ) + self.check_metadata_for_string( 'Change data type selected value="lav" selected="yes"' ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0105_upload_file( self ): + """Test uploading 1.interval, NOT setting the file format""" + # Logged in as admin_user + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() + self.upload_file( '1.interval' ) + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.verify_dataset_correctness( '1.interval', hid=str( hda.hid ) ) + self.check_history_for_string( '1.interval format: <span class="interval">interval</span>, database: \? Info: uploaded file' ) + self.check_metadata_for_string( 'value="1.interval" value="\?"' ) + self.check_metadata_for_string( 'Chrom column: <option value="1" selected> Start column: <option value="2" selected>' ) + self.check_metadata_for_string( 'End column: <option value="3" selected> Strand column <option value="6" selected>' ) + self.check_metadata_for_string( 'Convert to new format <option value="bed">Convert Genomic Intervals To BED' ) + self.check_metadata_for_string( 'Change data type selected value="interval" selected="yes"' ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0110_upload_file( self ): + """Test uploading 5.gff3, NOT setting the file format""" + # Logged in as admin_user + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() + self.upload_file( '5.gff3' ) + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.verify_dataset_correctness( '5.gff3', hid=str( hda.hid ) ) + self.check_history_for_string( '5.gff3 format: <span class="gff3">gff3</span>, database: \? Info: uploaded file' ) + self.check_metadata_for_string( 'value="5.gff3" value="\?"' ) + self.check_metadata_for_string( 'Convert to new format <option value="bed">Convert GFF to BED' ) + self.check_metadata_for_string( 'Change data type selected value="gff3" selected="yes"' ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0115_upload_file( self ): + """Test uploading html_file.txt, NOT setting the file format""" + # Logged in as admin_user + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() + self.upload_file( 'html_file.txt' ) + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.check_history_for_string( 'The uploaded file contains inappropriate content' ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0120_upload_file( self ): + """Test uploading 5.gff, NOT setting the file format""" + # Logged in as admin_user + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() + self.upload_file( '5.gff' ) + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.verify_dataset_correctness( '5.gff', hid=str( hda.hid ) ) + self.check_history_for_string( '5.gff format: <span class="gff">gff</span>, database: \? Info: uploaded file' ) + self.check_metadata_for_string( 'value="5.gff" value="\?"' ) + self.check_metadata_for_string( 'Convert to new format <option value="bed">Convert GFF to BED' ) + self.check_metadata_for_string( 'Change data type selected value="gff" selected="yes"' ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0125_upload_file( self ): + """Test uploading 1.fasta, NOT setting the file format""" + # Logged in as admin_user + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() + self.upload_file( '1.fasta' ) + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.verify_dataset_correctness( '1.fasta', hid=str( hda.hid ) ) + self.check_history_for_string( '1.fasta format: <span class="fasta">fasta</span>, database: \? Info: uploaded file' ) + self.check_metadata_for_string( 'value="1.fasta" value="\?" Change data type selected value="fasta" selected="yes"' ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0130_upload_file( self ): + """Test uploading 1.customtrack, NOT setting the file format""" + # Logged in as admin_user + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() + self.upload_file( '1.customtrack' ) + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.verify_dataset_correctness( '1.customtrack', hid=str( hda.hid ) ) + self.check_history_for_string( '1.customtrack format: <span class="customtrack">customtrack</span>, database: \? Info: uploaded file' ) + self.check_metadata_for_string( 'value="1.customtrack" value="\?" Change data type selected value="customtrack" selected="yes"' ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0135_upload_file( self ): + """Test uploading shrimp_cs_test1.csfasta, NOT setting the file format""" + # Logged in as admin_user + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() + self.upload_file( 'shrimp_cs_test1.csfasta' ) + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.verify_dataset_correctness( 'shrimp_cs_test1.csfasta', hid=str( hda.hid ) ) + self.check_history_for_string( '162.6 Kb, format: <span class="csfasta">csfasta</span>, <td>>2_14_26_F3,-1282216.0</td>' ) + self.check_metadata_for_string( 'value="shrimp_cs_test1.csfasta" value="\?" Change data type value="csfasta" selected="yes"' ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0140_upload_file( self ): + """Test uploading megablast_xml_parser_test1.gz, NOT setting the file format""" + # Logged in as admin_user + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() + self.upload_file( 'megablast_xml_parser_test1.gz' ) + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.check_history_for_string( 'NCBI Blast XML data format: <span class="blastxml">blastxml</span>' ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0145_upload_file( self ): + """Test uploading 1.axt, NOT setting the file format""" + # Logged in as admin_user + self.check_history_for_string( 'Your history is empty' ) + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() + self.upload_file( '1.axt' ) + hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ + .first() + assert hda is not None, "Problem retrieving hda from database" + self.verify_dataset_correctness( '1.axt', hid=str( hda.hid ) ) + self.check_history_for_string( '1.axt format: <span class="axt">axt</span>, database: \? Info: uploaded file' ) + self.check_metadata_for_string( 'value="1.axt" value="\?" Change data type selected value="axt" selected="yes"' ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_0150_url_paste( self ): """Test url paste behavior""" # Logged in as admin_user # Deleting the current history should have created a new history self.check_history_for_string( 'Your history is empty' ) - history2 = sa_session.query( galaxy.model.History ) \ - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==admin_user.id ) ) \ - .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ - .first() + history = sa_session.query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted==False, + galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .first() self.upload_url_paste( 'hello world' ) self.check_history_for_string( 'Pasted Entry' ) self.check_history_for_string( 'hello world' ) self.upload_url_paste( u'hello world' ) self.check_history_for_string( 'Pasted Entry' ) self.check_history_for_string( 'hello world' ) - self.delete_history( id=self.security.encode_id( history2.id ) ) - def test_010_upload_lped_composite_datatype_files( self ): - """Test uploading lped composite datatype files""" - # Logged in as admin_user - self.check_history_for_string( 'Your history is empty' ) - history3 = sa_session.query( galaxy.model.History ) \ - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==admin_user.id ) ) \ - .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ - .first() - # lped data types include a ped_file and a map_file ( which is binary ) - self.upload_composite_datatype_file( 'lped', ped_file='tinywga.ped', map_file='tinywga.map', base_name='rgenetics' ) - # Get the latest hid for testing - hda1 = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert hda1 is not None, "Problem retrieving hda1 from database" - # We'll test against the resulting ped file and map file for correctness - self.verify_composite_datatype_file_content( 'rgenetics.ped', str( hda1.id ) ) - self.verify_composite_datatype_file_content( 'rgenetics.map', str( hda1.id ) ) - self.delete_history( id=self.security.encode_id( history3.id ) ) - def test_015_upload_pbed_composite_datatype_files( self ): - """Test uploading pbed composite datatype files""" - # Logged in as admin_user - self.check_history_for_string( 'Your history is empty' ) - history4 = sa_session.query( galaxy.model.History ) \ - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==admin_user.id ) ) \ - .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ - .first() - # pbed data types include a bim_file, a bed_file and a fam_file - self.upload_composite_datatype_file( 'pbed', bim_file='tinywga.bim', bed_file='tinywga.bed', fam_file='tinywga.fam', base_name='rgenetics' ) - # Get the latest hid for testing - hda1 = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert hda1 is not None, "Problem retrieving hda1 from database" - # We'll test against the resulting ped file and map file for correctness - self.verify_composite_datatype_file_content( 'rgenetics.bim', str( hda1.id ) ) - self.verify_composite_datatype_file_content( 'rgenetics.bed', str( hda1.id ) ) - self.verify_composite_datatype_file_content( 'rgenetics.fam', str( hda1.id ) ) - self.delete_history( id=self.security.encode_id( history4.id ) ) - def test_020_upload_multibyte_character_file( self ): - """Test uploading multi-byte character file""" - # Logged in as admin_user - self.check_history_for_string( 'Your history is empty' ) - history5 = sa_session.query( galaxy.model.History ) \ - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==admin_user.id ) ) \ - .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ - .first() - self.upload_file( 'asian_chars_1.txt' ) - hda1 = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert hda1 is not None, "Problem retrieving hda1 from database" - self.verify_dataset_correctness( 'asian_chars_1.txt', hid=str( hda1.hid ) ) - self.check_history_for_string( 'uploaded multi-byte char file' ) - self.delete_history( id=self.security.encode_id( history5.id ) ) + self.delete_history( id=self.security.encode_id( history.id ) ) + def test_9999_clean_up( self ): + self.logout() \ No newline at end of file diff -r 4f9b630f9976 -r 23c1b9799bce test/functional/test_sniffing_and_metadata_settings.py --- a/test/functional/test_sniffing_and_metadata_settings.py Fri Nov 13 10:39:15 2009 -0500 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,262 +0,0 @@ -import galaxy.model -from galaxy.model.orm import * -from galaxy.model.mapping import context as sa_session -from base.twilltestcase import TwillTestCase - -class SniffingAndMetaDataSettings( TwillTestCase ): - def test_000_axt_datatype( self ): - """Testing correctly sniffing axt data type upon upload""" - self.logout() - self.login( email='test@bx.psu.edu' ) - global admin_user - admin_user = sa_session.query( galaxy.model.User ).filter( galaxy.model.User.table.c.email=='test@bx.psu.edu' ).one() - self.new_history( name='history1' ) - global history1 - history1 = sa_session.query( galaxy.model.History ) \ - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==admin_user.id ) ) \ - .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ - .first() - assert history1 is not None, "Problem retrieving history1 from database" - self.upload_file( '1.axt' ) - self.verify_dataset_correctness( '1.axt' ) - self.check_history_for_string( '1.axt format: <span class="axt">axt</span>, database: \? Info: uploaded file' ) - self.check_metadata_for_string( 'value="1.axt" value="\?" Change data type selected value="axt" selected="yes"' ) - latest_hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert latest_hda is not None, "Problem retrieving axt hda from the database" - if not latest_hda.name == '1.axt' and not latest_hda.extension == 'axt': - raise AssertionError, "axt data type was not correctly sniffed." - def test_005_bed_datatype( self ): - """Testing correctly sniffing bed data type upon upload""" - self.upload_file( '1.bed' ) - self.verify_dataset_correctness( '1.bed' ) - self.check_history_for_string( '1.bed format: <span class="bed">bed</span>, database: \? Info: uploaded file') - self.check_metadata_for_string( 'value="1.bed" value="\?"' ) - self.check_metadata_for_string( 'Chrom column: <option value="1" selected> Start column: <option value="2" selected>' ) - self.check_metadata_for_string( 'End column: <option value="3" selected> Strand column <option value="6" selected>' ) - self.check_metadata_for_string( 'Convert to new format value="bed">Convert Genomic Intervals To BED <option value="gff">Convert BED to GFF' ) - self.check_metadata_for_string( 'Change data type selected value="bed" selected="yes"' ) - latest_hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert latest_hda is not None, "Problem retrieving bed hda from the database" - if not latest_hda.name == '1.bed' and not latest_hda.extension == 'bed': - raise AssertionError, "bed data type was not correctly sniffed." - def test_010_blastxml_datatype( self ): - """Testing correctly sniffing blastxml data type upon upload""" - self.upload_file( 'megablast_xml_parser_test1.gz' ) - self.check_history_for_string( 'NCBI Blast XML data format: <span class="blastxml">blastxml</span>' ) - latest_hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert latest_hda is not None, "Problem retrieving blastxml hda from the database" - if not latest_hda.name == 'megablast_xml_parser_test1' and not latest_hda.extension == 'blastxml': - raise AssertionError, "blastxml data type was not correctly sniffed." - def test_015_csfasta_datatype( self ): - """Testing correctly sniffing csfasta data type upon upload""" - self.upload_file( 'shrimp_cs_test1.csfasta' ) - self.verify_dataset_correctness( 'shrimp_cs_test1.csfasta' ) - self.check_history_for_string( '162.6 Kb, format: <span class="csfasta">csfasta</span>, <td>>2_14_26_F3,-1282216.0</td>' ) - self.check_metadata_for_string( 'value="shrimp_cs_test1.csfasta" value="\?" Change data type value="csfasta" selected="yes"' ) - latest_hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert latest_hda is not None, "Problem retrieving csfasta hda from the database" - if not latest_hda.name == 'shrimp_cs_test1.csfasta' and not latest_hda.extension == 'csfasta': - raise AssertionError, "csfasta data type was not correctly sniffed." - def test_020_customtrack_datatype( self ): - """Testing correctly sniffing customtrack data type upon upload""" - self.upload_file( '1.customtrack' ) - self.verify_dataset_correctness( '1.customtrack' ) - self.check_history_for_string( '1.customtrack format: <span class="customtrack">customtrack</span>, database: \? Info: uploaded file' ) - self.check_metadata_for_string( 'value="1.customtrack" value="\?" Change data type selected value="customtrack" selected="yes"' ) - latest_hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert latest_hda is not None, "Problem retrieving customtrack hda from the database" - if not latest_hda.name == '1.customtrack' and not latest_hda.extension == 'customtrack': - raise AssertionError, "customtrack data type was not correctly sniffed." - def test_025_fasta_datatype( self ): - """Testing correctly sniffing fasta data type upon upload""" - self.upload_file( '1.fasta' ) - self.verify_dataset_correctness( '1.fasta' ) - self.check_history_for_string( '1.fasta format: <span class="fasta">fasta</span>, database: \? Info: uploaded file' ) - self.check_metadata_for_string( 'value="1.fasta" value="\?" Change data type selected value="fasta" selected="yes"' ) - latest_hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert latest_hda is not None, "Problem retrieving fasta hda from the database" - if not latest_hda.name == '1.fasta' and not latest_hda.extension == 'fasta': - raise AssertionError, "fasta data type was not correctly sniffed." - def test_035_gff_datatype( self ): - """Testing correctly sniffing gff data type upon upload""" - self.upload_file( '5.gff' ) - self.verify_dataset_correctness( '5.gff' ) - self.check_history_for_string( '5.gff format: <span class="gff">gff</span>, database: \? Info: uploaded file' ) - self.check_metadata_for_string( 'value="5.gff" value="\?"' ) - self.check_metadata_for_string( 'Convert to new format <option value="bed">Convert GFF to BED' ) - self.check_metadata_for_string( 'Change data type selected value="gff" selected="yes"' ) - latest_hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert latest_hda is not None, "Problem retrieving gff hda from the database" - if not latest_hda.name == '5.gff' and not latest_hda.extension == 'gff': - raise AssertionError, "gff data type was not correctly sniffed." - def test_040_gff3_datatype( self ): - """Testing correctly sniffing gff3 data type upon upload""" - self.upload_file( '5.gff3' ) - self.verify_dataset_correctness( '5.gff3' ) - self.check_history_for_string( '5.gff3 format: <span class="gff3">gff3</span>, database: \? Info: uploaded file' ) - self.check_metadata_for_string( 'value="5.gff3" value="\?"' ) - self.check_metadata_for_string( 'Convert to new format <option value="bed">Convert GFF to BED' ) - self.check_metadata_for_string( 'Change data type selected value="gff3" selected="yes"' ) - latest_hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert latest_hda is not None, "Problem retrieving gff3 hda from the database" - if not latest_hda.name == '5.gff3' and not latest_hda.extension == 'gff3': - raise AssertionError, "gff3 data type was not correctly sniffed." - # TODO: the following test generates a data.hid == None, breaking this and all following tests - # I am not currently able to track down why, and uploading inappropriate files outside of the - # functional test framework seems to generate valid hids, so this needs to be tracked down and fixed - # ASAP, un-commenting this test. - #def test_045_html_datatype( self ): - #"""Testing correctly sniffing html data type upon upload""" - #self.upload_file( 'html_file.txt' ) - #self.check_history_for_string( 'An error occurred running this job: No data: you attempted to upload an inappropriate file.' ) - #latest_hda = galaxy.model.HistoryDatasetAssociation.query() \ - # .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ).first() - #assert latest_hda is not None, "Problem retrieving html hda from the database" - #if not latest_hda.name == 'html_file.txt' and not latest_hda.extension == 'data': - # raise AssertionError, "html data type was not correctly sniffed." - def test_050_interval_datatype( self ): - """Testing correctly sniffing interval data type upon upload""" - self.upload_file( '1.interval' ) - self.verify_dataset_correctness( '1.interval' ) - self.check_history_for_string( '1.interval format: <span class="interval">interval</span>, database: \? Info: uploaded file' ) - self.check_metadata_for_string( 'value="1.interval" value="\?"' ) - self.check_metadata_for_string( 'Chrom column: <option value="1" selected> Start column: <option value="2" selected>' ) - self.check_metadata_for_string( 'End column: <option value="3" selected> Strand column <option value="6" selected>' ) - self.check_metadata_for_string( 'Convert to new format <option value="bed">Convert Genomic Intervals To BED' ) - self.check_metadata_for_string( 'Change data type selected value="interval" selected="yes"' ) - latest_hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert latest_hda is not None, "Problem retrieving interval hda from the database" - if not latest_hda.name == '1.interval' and not latest_hda.extension == 'interval': - raise AssertionError, "interval data type was not correctly sniffed." - def test_055_lav_datatype( self ): - """Testing correctly sniffing lav data type upon upload""" - self.upload_file( '1.lav' ) - self.verify_dataset_correctness( '1.lav' ) - self.check_history_for_string( '1.lav format: <span class="lav">lav</span>, database: \? Info: uploaded file' ) - self.check_metadata_for_string( 'value="1.lav" value="\?"' ) - self.check_metadata_for_string( 'Change data type selected value="lav" selected="yes"' ) - latest_hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert latest_hda is not None, "Problem retrieving lav hda from the database" - if not latest_hda.name == '1.lav' and not latest_hda.extension == 'lav': - raise AssertionError, "lav data type was not correctly sniffed." - def test_060_maf_datatype( self ): - """Testing correctly sniffing maf data type upon upload""" - self.upload_file( '3.maf' ) - self.verify_dataset_correctness( '3.maf' ) - self.check_history_for_string( '3.maf format: <span class="maf">maf</span>, database: \? Info: uploaded file' ) - self.check_metadata_for_string( 'value="3.maf" value="\?"' ) - self.check_metadata_for_string( 'Convert to new format <option value="interval">Convert MAF to Genomic Intervals <option value="fasta">Convert MAF to Fasta' ) - self.check_metadata_for_string( 'Change data type selected value="maf" selected="yes"' ) - latest_hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert latest_hda is not None, "Problem retrieving maf hda from the database" - if not latest_hda.name == '3.maf' and not latest_hda.extension == 'maf': - raise AssertionError, "maf data type was not correctly sniffed." - def test_065_qual454_datatype( self ): - """Testing correctly sniffing qual454 data type upon upload""" - self.upload_file( 'qualscores.qual454' ) - self.verify_dataset_correctness( 'qualscores.qual454' ) - self.check_history_for_string( '5.6 Kb, format: <span class="qual454">qual454</span>, database: \?' ) - self.check_metadata_for_string( 'Change data type value="qual454" selected="yes">qual454' ) - latest_hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert latest_hda is not None, "Problem retrieving qual454 hda from the database" - if not latest_hda.name == 'qualscores.qual454' and not latest_hda.extension == 'qual454': - raise AssertionError, "qual454 data type was not correctly sniffed." - def test_070_qualsolid_datatype( self ): - """Testing correctly sniffing qualsolid data type upon upload""" - self.upload_file( 'qualscores.qualsolid' ) - self.verify_dataset_correctness('qualscores.qualsolid' ) - self.check_history_for_string('2.5 Kb, format: <span class="qualsolid">qualsolid</span>, database: \? Info: uploaded file' ) - self.check_metadata_for_string( 'Change data type value="qualsolid" selected="yes">qualsolid' ) - latest_hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert latest_hda is not None, "Problem retrieving qualsolid hda from the database" - if not latest_hda.name == 'qualscores.qualsolid' and not latest_hda.extension == 'qualsolid': - raise AssertionError, "qualsolid data type was not correctly sniffed." - def test_075_tabular_datatype( self ): - """Testing correctly sniffing tabular data type upon upload""" - self.upload_file( '1.tabular' ) - self.verify_dataset_correctness( '1.tabular' ) - self.check_history_for_string( '1.tabular format: <span class="tabular">tabular</span>, database: \? Info: uploaded file' ) - self.check_metadata_for_string( 'value="1.tabular" value="\?"' ) - self.check_metadata_for_string( 'Change data type selected value="tabular" selected="yes"' ) - latest_hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert latest_hda is not None, "Problem retrieving tabular hda from the database" - if not latest_hda.name == '1.tabular' and not latest_hda.extension == 'tabular': - raise AssertionError, "tabular data type was not correctly sniffed." - def test_080_wig_datatype( self ): - """Testing correctly sniffing wig data type upon upload""" - self.upload_file( '1.wig' ) - self.verify_dataset_correctness( '1.wig' ) - self.check_history_for_string( '1.wig format: <span class="wig">wig</span>, database: \? Info: uploaded file' ) - self.check_metadata_for_string( 'value="1.wig" value="\?"' ) - self.check_metadata_for_string( 'Change data type selected value="wig" selected="yes"' ) - latest_hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert latest_hda is not None, "Problem retrieving wig hda from the database" - if not latest_hda.name == '1.wig' and not latest_hda.extension == 'wig': - raise AssertionError, "wig data type was not correctly sniffed." - def test_090_sam_datatype( self ): - """Testing correctly sniffing sam format upon upload""" - self.upload_file( '1.sam' ) - self.verify_dataset_correctness( '1.sam' ) - self.check_history_for_string( '1.sam format: <span class="sam">sam</span>, database: \? Info: uploaded sam file' ) - latest_hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert latest_hda is not None, "Problem retrieving sam hda from the database" - if not latest_hda.name == '1.sam' and not latest_hda.extension == 'sam': - raise AssertionError, "sam data type was not correctly sniffed." - def test_095_fastq_datatype( self ): - """Testing correctly sniffing fastq ( generic ) data type upon upload""" - self.upload_file( '2gen.fastq' ) - self.verify_dataset_correctness( '2gen.fastq' ) - self.check_history_for_string( '2gen.fastq format: <span class="fastq">fastq</span>, database: \? Info: uploaded fastq file' ) - latest_hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert latest_hda is not None, "Problem retrieving fastq hda from the database" - if not latest_hda.name == '2gen.fastq' and not latest_hda.extension == 'fastq': - raise AssertionError, "fastq data type was not correctly sniffed." - def test_0100_sff_datatype( self ): - """Testing correctly sniffing sff format upon upload""" - self.upload_file( '1.sff' ) - self.verify_dataset_correctness( '1.sff' ) - self.check_history_for_string( 'format: <span class="sff">sff' ) - latest_hda = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ - .first() - assert latest_hda is not None, "Problem retrieving sff hda from the database" - if not latest_hda.name == '1.sff' and not latest_hda.extension == 'sff': - raise AssertionError, "sff data type was not correctly sniffed." - def test_9999_clean_up( self ): - self.delete_history( id=self.security.encode_id( history1.id ) ) - self.logout() diff -r 4f9b630f9976 -r 23c1b9799bce tools/data_source/upload.py --- a/tools/data_source/upload.py Fri Nov 13 10:39:15 2009 -0500 +++ b/tools/data_source/upload.py Fri Nov 13 14:13:03 2009 -0500 @@ -9,6 +9,7 @@ # need to import model before sniff to resolve a circular import dependency import galaxy.model from galaxy.datatypes import sniff +from galaxy.datatypes.binary import sniffable_binary_formats, unsniffable_binary_formats from galaxy import util from galaxy.util.json import * @@ -200,25 +201,29 @@ ext = dataset.file_type if not data_type: if check_binary( dataset.path ): - if dataset.is_binary is not None: - data_type = 'binary' - ext = dataset.file_type - else: - parts = dataset.name.split( "." ) - if len( parts ) > 1: - ext = parts[1].strip().lower() - if not( ext == 'ab1' or ext == 'scf' ): - file_err( 'The uploaded file contains inappropriate content', dataset, json_file ) + data_type = 'binary' + binary_ok = False + parts = dataset.name.split( "." ) + if len( parts ) > 1: + ext = parts[1].strip().lower() + if ext in unsniffable_binary_formats and dataset.file_type == ext: + binary_ok = True + elif ext in unsniffable_binary_formats and dataset.file_type != ext: + err_msg = "You must manually set the 'File Format' to '%s' when uploading %s files." % ( ext.capitalize(), ext ) + file_err( err_msg, dataset, json_file ) + return + if not binary_ok and ext in sniffable_binary_formats: + # Sniff the file to confirm it's data type + tmp_ext = sniff.guess_ext( dataset.path ) + if tmp_ext == ext: + binary_ok = True + else: + err_msg = "The content of the file does not match its type (%s)." % ext.capitalize() + file_err( err_msg, dataset, json_file ) return - if ext == 'ab1' and dataset.file_type != 'ab1': - file_err( "You must manually set the 'File Format' to 'Ab1' when uploading ab1 files.", dataset, json_file ) - return - elif ext == 'scf' and dataset.file_type != 'scf': - file_err( "You must manually set the 'File Format' to 'Scf' when uploading scf files.", dataset, json_file ) - return - else: - ext = 'binary' - data_type = 'binary' + if not binary_ok: + file_err( 'The uploaded file contains inappropriate content', dataset, json_file ) + return if not data_type: # We must have a text file if check_html( dataset.path ): @@ -234,10 +239,6 @@ else: ext = dataset.file_type data_type = ext - elif data_type == 'binary' and ext == 'auto': - # currently we are only sniffing sff binary files - ext = sniff.guess_ext( dataset.path ) - data_type = ext # Save job info for the framework if ext == 'auto' and dataset.ext: ext = dataset.ext
participants (1)
-
Greg Von Kuster