1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/04de30a1d852/
Changeset: 04de30a1d852
Branch: next-stable
User: dan
Date: 2014-08-01 21:46:53
Summary: Fix for MAF stitching tools handling of the number of open temporary files.
Fixes
https://trello.com/c/UgqThlrh and
https://trello.com/c/CQePdepc.
Affected #: 1 file
diff -r 2559a58e62f292ae537974f8a3fc86ea32ccf8ad -r
04de30a1d852400f5935e2ea20c02cf3e3416c58 lib/galaxy/tools/util/maf_utilities.py
--- a/lib/galaxy/tools/util/maf_utilities.py
+++ b/lib/galaxy/tools/util/maf_utilities.py
@@ -9,6 +9,8 @@
import bx.interval_index_file
import sys, os, string, tempfile
import logging
+from errno import EMFILE
+import resource
from copy import deepcopy
assert sys.version_info[:2] >= ( 2, 4 )
@@ -45,15 +47,96 @@
print >> sys.stderr, "Fatal Error: %s" % msg
sys.exit()
+
+class TempFileHandler( object ):
+ '''
+ Handles creating, opening, closing, and deleting of Temp files, with a
+ maximum number of files open at one time.
+ '''
+
+ DEFAULT_MAX_OPEN_FILES = max( resource.getrlimit( resource.RLIMIT_NOFILE )[0] / 2, 1
)
+
+ def __init__( self, max_open_files=None, **kwds ):
+ if max_open_files is None:
+ max_open_files = self.DEFAULT_MAX_OPEN_FILES
+ self.max_open_files = max_open_files
+ self.files = []
+ self.open_file_indexes = []
+ self.kwds = kwds
+
+ def get_open_tempfile( self, index=None, **kwds ):
+ if index is not None and index in self.open_file_indexes:
+ self.open_file_indexes.remove( index )
+ else:
+ if self.max_open_files:
+ while len( self.open_file_indexes ) >= self.max_open_files:
+ self.close( self.open_file_indexes[0] )
+ if index is None:
+ index = len( self.files )
+ temp_kwds = dict( self.kwds )
+ temp_kwds.update( kwds )
+ # Being able to use delete=True here, would simplify a bit,
+ # but we support python2.4 in these tools
+ while True:
+ try:
+ tmp_file = tempfile.NamedTemporaryFile( **temp_kwds )
+ filename = tmp_file.name
+ break
+ except OSError, e:
+ if self.open_file_indexes and e.errno == EMFILE:
+ self.max_open_files = len( self.open_file_indexes )
+ self.close( self.open_file_indexes[0] )
+ else:
+ raise e
+ tmp_file.close()
+ self.files.append( open( filename, 'w+b' ) )
+ else:
+ while True:
+ try:
+ self.files[ index ] = open( self.files[ index ].name,
'r+b' )
+ break
+ except OSError, e:
+ if self.open_file_indexes and e.errno == EMFILE:
+ self.max_open_files = len( self.open_file_indexes )
+ self.close( self.open_file_indexes[0] )
+ else:
+ raise e
+ self.files[ index ].seek( 0, 2 )
+ self.open_file_indexes.append( index )
+ return index, self.files[ index ]
+
+ def close( self, index, delete=False ):
+ if index in self.open_file_indexes:
+ self.open_file_indexes.remove( index )
+ rval = self.files[ index ].close()
+ if delete:
+ try:
+ os.unlink( self.files[ index ].name )
+ except OSError:
+ pass
+ return rval
+
+ def flush( self, index ):
+ if index in self.open_file_indexes:
+ self.files[ index ].flush()
+
+ def __del__( self ):
+ for i in xrange( len( self.files ) ):
+ self.close( i, delete=True )
+
+
#an object corresponding to a reference layered alignment
class RegionAlignment( object ):
DNA_COMPLEMENT = string.maketrans( "ACGTacgt", "TGCAtgca" )
MAX_SEQUENCE_SIZE = sys.maxint #Maximum length of sequence allowed
- def __init__( self, size, species = [] ):
+ def __init__( self, size, species = [], temp_file_handler = None ):
assert size <= self.MAX_SEQUENCE_SIZE, "Maximum length allowed for an
individual sequence has been exceeded (%i > %i)." % ( size, self.MAX_SEQUENCE_SIZE
)
self.size = size
+ if not temp_file_handler:
+ temp_file_handler = TempFileHandler()
+ self.temp_file_handler = temp_file_handler
self.sequences = {}
if not isinstance( species, list ):
species = [species]
@@ -63,8 +146,9 @@
#add a species to the alignment
def add_species( self, species ):
#make temporary sequence files
- self.sequences[species] = tempfile.TemporaryFile()
- self.sequences[species].write( "-" * self.size )
+ file_index, fh = self.temp_file_handler.get_open_tempfile()
+ self.sequences[species] = file_index
+ fh.write( "-" * self.size )
#returns the names for species found in alignment, skipping names as requested
def get_species_names( self, skip = [] ):
@@ -77,8 +161,9 @@
#returns the sequence for a species
def get_sequence( self, species ):
- self.sequences[species].seek( 0 )
- return self.sequences[species].read()
+ file_index, fh = self.temp_file_handler.get_open_tempfile(
self.sequences[species] )
+ fh.seek( 0 )
+ return fh.read()
#returns the reverse complement of the sequence for a species
def get_sequence_reverse_complement( self, species ):
@@ -95,8 +180,9 @@
if index >= self.size or index < 0: raise Exception( "Your index (%i)
is out of range (0 - %i)." % ( index, self.size - 1 ) )
if len( bases ) == 0: raise Exception( "A set of genomic positions can only
have a positive length." )
if species not in self.sequences.keys(): self.add_species( species )
- self.sequences[species].seek( index )
- self.sequences[species].write( bases )
+ file_index, fh = self.temp_file_handler.get_open_tempfile(
self.sequences[species] )
+ fh.seek( index )
+ fh.write( bases )
#Flush temp file of specified species, or all species
def flush( self, species = None ):
@@ -105,12 +191,12 @@
elif not isinstance( species, list ):
species = [species]
for spec in species:
- self.sequences[spec].flush()
+ self.temp_file_handler.flush( self.sequences[spec] )
class GenomicRegionAlignment( RegionAlignment ):
- def __init__( self, start, end, species = [] ):
- RegionAlignment.__init__( self, end - start, species )
+ def __init__( self, start, end, species = [], temp_file_handler = None ):
+ RegionAlignment.__init__( self, end - start, species,
temp_file_handler=temp_file_handler )
self.start = start
self.end = end
@@ -118,15 +204,18 @@
DNA_COMPLEMENT = string.maketrans( "ACGTacgt", "TGCAtgca" )
- def __init__( self, exon_starts, exon_ends, species = [] ):
+ def __init__( self, exon_starts, exon_ends, species = [], temp_file_handler = None
):
if not isinstance( exon_starts, list ):
exon_starts = [exon_starts]
if not isinstance( exon_ends, list ):
exon_ends = [exon_ends]
assert len( exon_starts ) == len( exon_ends ), "The number of starts does
not match the number of sizes."
self.exons = []
+ if not temp_file_handler:
+ temp_file_handler = TempFileHandler()
+ self.temp_file_handler = temp_file_handler
for i in range( len( exon_starts ) ):
- self.exons.append( GenomicRegionAlignment( exon_starts[i], exon_ends[i],
species ) )
+ self.exons.append( GenomicRegionAlignment( exon_starts[i], exon_ends[i],
species, temp_file_handler=temp_file_handler ) )
#returns the names for species found in alignment, skipping names as requested
def get_species_names( self, skip = [] ):
@@ -140,14 +229,20 @@
#returns the sequence for a species
def get_sequence( self, species ):
- sequence = tempfile.TemporaryFile()
+ index, fh = self.temp_file_handler.get_open_tempfile()
for exon in self.exons:
if species in exon.get_species_names():
- sequence.write( exon.get_sequence( species ) )
+ seq = exon.get_sequence( species )
+ # we need to refetch fh here, since exon.get_sequence( species ) uses a
tempfile
+ # and if max==1, it would close fh
+ index, fh = self.temp_file_handler.get_open_tempfile( index )
+ fh.write( seq )
else:
- sequence.write( "-" * exon.size )
- sequence.seek( 0 )
- return sequence.read()
+ fh.write( "-" * exon.size )
+ fh.seek( 0 )
+ rval = fh.read()
+ self.temp_file_handler.close( index, delete=True )
+ return rval
#returns the reverse complement of the sequence for a species
def get_sequence_reverse_complement( self, species ):
@@ -366,9 +461,9 @@
yield block, idx, offset
#returns a filled region alignment for specified regions
-def get_region_alignment( index, primary_species, chrom, start, end, strand =
'+', species = None, mincols = 0, overwrite_with_gaps = True ):
- if species is not None: alignment = RegionAlignment( end - start, species )
- else: alignment = RegionAlignment( end - start, primary_species )
+def get_region_alignment( index, primary_species, chrom, start, end, strand =
'+', species = None, mincols = 0, overwrite_with_gaps = True, temp_file_handler =
None ):
+ if species is not None: alignment = RegionAlignment( end - start, species,
temp_file_handler=temp_file_handler )
+ else: alignment = RegionAlignment( end - start, primary_species,
temp_file_handler=temp_file_handler )
return fill_region_alignment( alignment, index, primary_species, chrom, start, end,
strand, species, mincols, overwrite_with_gaps )
#reduces a block to only positions exisiting in the src provided
@@ -437,10 +532,10 @@
return alignment
#returns a filled spliced region alignment for specified region with start and end lists
-def get_spliced_region_alignment( index, primary_species, chrom, starts, ends, strand =
'+', species = None, mincols = 0, overwrite_with_gaps = True ):
+def get_spliced_region_alignment( index, primary_species, chrom, starts, ends, strand =
'+', species = None, mincols = 0, overwrite_with_gaps = True, temp_file_handler =
None ):
#create spliced alignment object
- if species is not None: alignment = SplicedAlignment( starts, ends, species )
- else: alignment = SplicedAlignment( starts, ends, [primary_species] )
+ if species is not None: alignment = SplicedAlignment( starts, ends, species,
temp_file_handler=temp_file_handler )
+ else: alignment = SplicedAlignment( starts, ends, [primary_species],
temp_file_handler=temp_file_handler )
for exon in alignment.exons:
fill_region_alignment( exon, index, primary_species, chrom, exon.start, exon.end,
strand, species, mincols, overwrite_with_gaps )
return alignment
Repository URL:
https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from
bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.