1 new changeset in galaxy-central: http://bitbucket.org/galaxy/galaxy-central/changeset/c2bc73b017cb/ changeset: c2bc73b017cb branches: user: greg date: 2011-06-03 21:36:59 summary: Move duplicate data type checker methods from sniff and upload into a new ~/datatypes/checkers.py. affected #: 3 files (4.4 KB) --- a/lib/galaxy/datatypes/sniff.py Fri Jun 03 15:18:31 2011 -0400 +++ b/lib/galaxy/datatypes/sniff.py Fri Jun 03 15:36:59 2011 -0400 @@ -4,6 +4,7 @@ import logging, sys, os, csv, tempfile, shutil, re, zipfile, gzip import registry from galaxy import util +from galaxy.datatypes.checkers import * from galaxy.datatypes.binary import unsniffable_binary_formats log = logging.getLogger(__name__) @@ -319,59 +320,6 @@ return 'tabular' #default tabular data type file extension return 'txt' #default text data type file extension - -#Methods Used below can be used to upload new datasets into Galaxy. Currently used by the data_source.py script/tools. -#These should be further abstracted and merged with upload.py script/tool functionality. -def is_gzip( filename ): - temp = open( filename, "U" ) - magic_check = temp.read( 2 ) - temp.close() - if magic_check != util.gzip_magic: - return False - return True - - -def is_binary( filename ): - is_binary = False - temp = open( filename, "U" ) - chars_read = 0 - for chars in temp: - for char in chars: - chars_read += 1 - if ord( char ) > 128: - is_binary = True - break - if chars_read > 100: - break - if chars_read > 100: - break - temp.close() - return is_binary - -def is_html( temp_name, chunk=None ): - if chunk is None: - temp = open(temp_name, "U") - else: - temp = chunk - regexp1 = re.compile( "<A\s+[^>]*HREF[^>]+>", re.I ) - regexp2 = re.compile( "<IFRAME[^>]*>", re.I ) - regexp3 = re.compile( "<FRAMESET[^>]*>", re.I ) - regexp4 = re.compile( "<META[^>]*>", re.I ) - regexp5 = re.compile( "<SCRIPT[^>]*>", re.I ) - lineno = 0 - for line in temp: - lineno += 1 - matches = regexp1.search( line ) or regexp2.search( line ) or regexp3.search( line ) or regexp4.search( line ) or regexp5.search( line ) - if matches: - if chunk is None: - temp.close() - return True - if lineno > 100: - break - if chunk is None: - temp.close() - return False - def handle_compressed_file( filename, datatypes_registry, ext = 'auto' ): CHUNK_SIZE = 2**20 # 1Mb is_compressed = False @@ -429,10 +377,10 @@ if ext in AUTO_DETECT_EXTENSIONS: ext = guess_ext( filename, sniff_order = datatypes_registry.sniff_order, is_multi_byte=is_multi_byte ) - if is_binary( filename ): + if check_binary( filename ): if ext not in unsniffable_binary_formats and not datatypes_registry.get_datatype_by_extension( ext ).sniff( filename ): raise InappropriateDatasetContentError, 'The binary uploaded file contains inappropriate content.' - elif is_html( filename ): + elif check_html( filename ): raise InappropriateDatasetContentError, 'The uploaded file contains inappropriate HTML content.' return ext @@ -449,4 +397,3 @@ if __name__ == '__main__': import doctest, sys doctest.testmod(sys.modules[__name__]) - --- a/tools/data_source/upload.py Fri Jun 03 15:18:31 2011 -0400 +++ b/tools/data_source/upload.py Fri Jun 03 15:36:59 2011 -0400 @@ -8,6 +8,7 @@ from galaxy import eggs # need to import model before sniff to resolve a circular import dependency import galaxy.model +from galaxy.datatypes.checkers import * from galaxy.datatypes import sniff from galaxy.datatypes.binary import * from galaxy.datatypes.images import Pdf @@ -48,104 +49,20 @@ return [safe_dict(x) for x in d] else: return d -def check_html( temp_name, chunk=None ): - if chunk is None: - temp = open(temp_name, "U") - else: - temp = chunk - regexp1 = re.compile( "<A\s+[^>]*HREF[^>]+>", re.I ) - regexp2 = re.compile( "<IFRAME[^>]*>", re.I ) - regexp3 = re.compile( "<FRAMESET[^>]*>", re.I ) - regexp4 = re.compile( "<META[^>]*>", re.I ) - regexp5 = re.compile( "<SCRIPT[^>]*>", re.I ) - lineno = 0 - for line in temp: - lineno += 1 - matches = regexp1.search( line ) or regexp2.search( line ) or regexp3.search( line ) or regexp4.search( line ) or regexp5.search( line ) - if matches: - if chunk is None: - temp.close() - return True - if lineno > 100: - break - if chunk is None: - temp.close() - return False -def check_binary( temp_name ): - is_binary = False - temp = open( temp_name, "U" ) - chars_read = 0 - for chars in temp: - for char in chars: - chars_read += 1 - if ord( char ) > 128: - is_binary = True - break - if chars_read > 100: - break - if chars_read > 100: - break - temp.close() - return is_binary -def check_bam( temp_name ): - return Bam().sniff( temp_name ) -def check_sff( temp_name ): - return Sff().sniff( temp_name ) -def check_pdf( temp_name ): - return Pdf().sniff( temp_name ) -def check_bigwig( temp_name ): - return BigWig().sniff( temp_name ) -def check_bigbed( temp_name ): - return BigBed().sniff( temp_name ) -def check_gzip( temp_name ): - # This method returns a tuple of booleans representing ( is_gzipped, is_valid ) - # Make sure we have a gzipped file - try: - temp = open( temp_name, "U" ) - magic_check = temp.read( 2 ) - temp.close() - if magic_check != util.gzip_magic: - return ( False, False ) - except: - return ( False, False ) - # We support some binary data types, so check if the compressed binary file is valid - # If the file is Bam, it should already have been detected as such, so we'll just check - # for sff format. - try: - header = gzip.open( temp_name ).read(4) - if binascii.b2a_hex( header ) == binascii.hexlify( '.sff' ): - return ( True, True ) - except: - return( False, False ) - CHUNK_SIZE = 2**15 # 32Kb - gzipped_file = gzip.GzipFile( temp_name, mode='rb' ) - chunk = gzipped_file.read( CHUNK_SIZE ) - gzipped_file.close() - # See if we have a compressed HTML file - if check_html( temp_name, chunk=chunk ): - return ( True, False ) - return ( True, True ) -def check_bz2( temp_name ): - try: - temp = open( temp_name, "U" ) - magic_check = temp.read( 3 ) - temp.close() - if magic_check != util.bz2_magic: - return ( False, False ) - except: - return( False, False ) - CHUNK_SIZE = 2**15 # reKb - bzipped_file = bz2.BZ2File( temp_name, mode='rb' ) - chunk = bzipped_file.read( CHUNK_SIZE ) - bzipped_file.close() - # See if we have a compressed HTML file - if check_html( temp_name, chunk=chunk ): - return ( True, False ) - return ( True, True ) -def check_zip( temp_name ): - if zipfile.is_zipfile( temp_name ): - return True - return False +def check_bam( file_path ): + return Bam().sniff( file_path ) + +def check_sff( file_path ): + return Sff().sniff( file_path ) + +def check_pdf( file_path ): + return Pdf().sniff( file_path ) + +def check_bigwig( file_path ): + return BigWig().sniff( file_path ) + +def check_bigbed( file_path ): + return BigBed().sniff( file_path ) def parse_outputs( args ): rval = {} for arg in args: Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.