1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/04de30a1d852/ Changeset: 04de30a1d852 Branch: next-stable User: dan Date: 2014-08-01 21:46:53 Summary: Fix for MAF stitching tools handling of the number of open temporary files. Fixes https://trello.com/c/UgqThlrh and https://trello.com/c/CQePdepc. Affected #: 1 file diff -r 2559a58e62f292ae537974f8a3fc86ea32ccf8ad -r 04de30a1d852400f5935e2ea20c02cf3e3416c58 lib/galaxy/tools/util/maf_utilities.py --- a/lib/galaxy/tools/util/maf_utilities.py +++ b/lib/galaxy/tools/util/maf_utilities.py @@ -9,6 +9,8 @@ import bx.interval_index_file import sys, os, string, tempfile import logging +from errno import EMFILE +import resource from copy import deepcopy assert sys.version_info[:2] >= ( 2, 4 ) @@ -45,15 +47,96 @@ print >> sys.stderr, "Fatal Error: %s" % msg sys.exit() + +class TempFileHandler( object ): + ''' + Handles creating, opening, closing, and deleting of Temp files, with a + maximum number of files open at one time. + ''' + + DEFAULT_MAX_OPEN_FILES = max( resource.getrlimit( resource.RLIMIT_NOFILE )[0] / 2, 1 ) + + def __init__( self, max_open_files=None, **kwds ): + if max_open_files is None: + max_open_files = self.DEFAULT_MAX_OPEN_FILES + self.max_open_files = max_open_files + self.files = [] + self.open_file_indexes = [] + self.kwds = kwds + + def get_open_tempfile( self, index=None, **kwds ): + if index is not None and index in self.open_file_indexes: + self.open_file_indexes.remove( index ) + else: + if self.max_open_files: + while len( self.open_file_indexes ) >= self.max_open_files: + self.close( self.open_file_indexes[0] ) + if index is None: + index = len( self.files ) + temp_kwds = dict( self.kwds ) + temp_kwds.update( kwds ) + # Being able to use delete=True here, would simplify a bit, + # but we support python2.4 in these tools + while True: + try: + tmp_file = tempfile.NamedTemporaryFile( **temp_kwds ) + filename = tmp_file.name + break + except OSError, e: + if self.open_file_indexes and e.errno == EMFILE: + self.max_open_files = len( self.open_file_indexes ) + self.close( self.open_file_indexes[0] ) + else: + raise e + tmp_file.close() + self.files.append( open( filename, 'w+b' ) ) + else: + while True: + try: + self.files[ index ] = open( self.files[ index ].name, 'r+b' ) + break + except OSError, e: + if self.open_file_indexes and e.errno == EMFILE: + self.max_open_files = len( self.open_file_indexes ) + self.close( self.open_file_indexes[0] ) + else: + raise e + self.files[ index ].seek( 0, 2 ) + self.open_file_indexes.append( index ) + return index, self.files[ index ] + + def close( self, index, delete=False ): + if index in self.open_file_indexes: + self.open_file_indexes.remove( index ) + rval = self.files[ index ].close() + if delete: + try: + os.unlink( self.files[ index ].name ) + except OSError: + pass + return rval + + def flush( self, index ): + if index in self.open_file_indexes: + self.files[ index ].flush() + + def __del__( self ): + for i in xrange( len( self.files ) ): + self.close( i, delete=True ) + + #an object corresponding to a reference layered alignment class RegionAlignment( object ): DNA_COMPLEMENT = string.maketrans( "ACGTacgt", "TGCAtgca" ) MAX_SEQUENCE_SIZE = sys.maxint #Maximum length of sequence allowed - def __init__( self, size, species = [] ): + def __init__( self, size, species = [], temp_file_handler = None ): assert size <= self.MAX_SEQUENCE_SIZE, "Maximum length allowed for an individual sequence has been exceeded (%i > %i)." % ( size, self.MAX_SEQUENCE_SIZE ) self.size = size + if not temp_file_handler: + temp_file_handler = TempFileHandler() + self.temp_file_handler = temp_file_handler self.sequences = {} if not isinstance( species, list ): species = [species] @@ -63,8 +146,9 @@ #add a species to the alignment def add_species( self, species ): #make temporary sequence files - self.sequences[species] = tempfile.TemporaryFile() - self.sequences[species].write( "-" * self.size ) + file_index, fh = self.temp_file_handler.get_open_tempfile() + self.sequences[species] = file_index + fh.write( "-" * self.size ) #returns the names for species found in alignment, skipping names as requested def get_species_names( self, skip = [] ): @@ -77,8 +161,9 @@ #returns the sequence for a species def get_sequence( self, species ): - self.sequences[species].seek( 0 ) - return self.sequences[species].read() + file_index, fh = self.temp_file_handler.get_open_tempfile( self.sequences[species] ) + fh.seek( 0 ) + return fh.read() #returns the reverse complement of the sequence for a species def get_sequence_reverse_complement( self, species ): @@ -95,8 +180,9 @@ if index >= self.size or index < 0: raise Exception( "Your index (%i) is out of range (0 - %i)." % ( index, self.size - 1 ) ) if len( bases ) == 0: raise Exception( "A set of genomic positions can only have a positive length." ) if species not in self.sequences.keys(): self.add_species( species ) - self.sequences[species].seek( index ) - self.sequences[species].write( bases ) + file_index, fh = self.temp_file_handler.get_open_tempfile( self.sequences[species] ) + fh.seek( index ) + fh.write( bases ) #Flush temp file of specified species, or all species def flush( self, species = None ): @@ -105,12 +191,12 @@ elif not isinstance( species, list ): species = [species] for spec in species: - self.sequences[spec].flush() + self.temp_file_handler.flush( self.sequences[spec] ) class GenomicRegionAlignment( RegionAlignment ): - def __init__( self, start, end, species = [] ): - RegionAlignment.__init__( self, end - start, species ) + def __init__( self, start, end, species = [], temp_file_handler = None ): + RegionAlignment.__init__( self, end - start, species, temp_file_handler=temp_file_handler ) self.start = start self.end = end @@ -118,15 +204,18 @@ DNA_COMPLEMENT = string.maketrans( "ACGTacgt", "TGCAtgca" ) - def __init__( self, exon_starts, exon_ends, species = [] ): + def __init__( self, exon_starts, exon_ends, species = [], temp_file_handler = None ): if not isinstance( exon_starts, list ): exon_starts = [exon_starts] if not isinstance( exon_ends, list ): exon_ends = [exon_ends] assert len( exon_starts ) == len( exon_ends ), "The number of starts does not match the number of sizes." self.exons = [] + if not temp_file_handler: + temp_file_handler = TempFileHandler() + self.temp_file_handler = temp_file_handler for i in range( len( exon_starts ) ): - self.exons.append( GenomicRegionAlignment( exon_starts[i], exon_ends[i], species ) ) + self.exons.append( GenomicRegionAlignment( exon_starts[i], exon_ends[i], species, temp_file_handler=temp_file_handler ) ) #returns the names for species found in alignment, skipping names as requested def get_species_names( self, skip = [] ): @@ -140,14 +229,20 @@ #returns the sequence for a species def get_sequence( self, species ): - sequence = tempfile.TemporaryFile() + index, fh = self.temp_file_handler.get_open_tempfile() for exon in self.exons: if species in exon.get_species_names(): - sequence.write( exon.get_sequence( species ) ) + seq = exon.get_sequence( species ) + # we need to refetch fh here, since exon.get_sequence( species ) uses a tempfile + # and if max==1, it would close fh + index, fh = self.temp_file_handler.get_open_tempfile( index ) + fh.write( seq ) else: - sequence.write( "-" * exon.size ) - sequence.seek( 0 ) - return sequence.read() + fh.write( "-" * exon.size ) + fh.seek( 0 ) + rval = fh.read() + self.temp_file_handler.close( index, delete=True ) + return rval #returns the reverse complement of the sequence for a species def get_sequence_reverse_complement( self, species ): @@ -366,9 +461,9 @@ yield block, idx, offset #returns a filled region alignment for specified regions -def get_region_alignment( index, primary_species, chrom, start, end, strand = '+', species = None, mincols = 0, overwrite_with_gaps = True ): - if species is not None: alignment = RegionAlignment( end - start, species ) - else: alignment = RegionAlignment( end - start, primary_species ) +def get_region_alignment( index, primary_species, chrom, start, end, strand = '+', species = None, mincols = 0, overwrite_with_gaps = True, temp_file_handler = None ): + if species is not None: alignment = RegionAlignment( end - start, species, temp_file_handler=temp_file_handler ) + else: alignment = RegionAlignment( end - start, primary_species, temp_file_handler=temp_file_handler ) return fill_region_alignment( alignment, index, primary_species, chrom, start, end, strand, species, mincols, overwrite_with_gaps ) #reduces a block to only positions exisiting in the src provided @@ -437,10 +532,10 @@ return alignment #returns a filled spliced region alignment for specified region with start and end lists -def get_spliced_region_alignment( index, primary_species, chrom, starts, ends, strand = '+', species = None, mincols = 0, overwrite_with_gaps = True ): +def get_spliced_region_alignment( index, primary_species, chrom, starts, ends, strand = '+', species = None, mincols = 0, overwrite_with_gaps = True, temp_file_handler = None ): #create spliced alignment object - if species is not None: alignment = SplicedAlignment( starts, ends, species ) - else: alignment = SplicedAlignment( starts, ends, [primary_species] ) + if species is not None: alignment = SplicedAlignment( starts, ends, species, temp_file_handler=temp_file_handler ) + else: alignment = SplicedAlignment( starts, ends, [primary_species], temp_file_handler=temp_file_handler ) for exon in alignment.exons: fill_region_alignment( exon, index, primary_species, chrom, exon.start, exon.end, strand, species, mincols, overwrite_with_gaps ) return alignment Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.