details: http://www.bx.psu.edu/hg/galaxy/rev/6127f0928166 changeset: 3221:6127f0928166 user: Greg Von Kuster <greg@bx.psu.edu> date: Fri Jan 08 16:22:44 2010 -0500 description: Enhance lastz to better handle files with many sequences. diffstat: tools/sr_mapping/lastz_wrapper.py | 194 ++++++++++++++++++++++-------------- tools/sr_mapping/lastz_wrapper.xml | 18 +-- 2 files changed, 127 insertions(+), 85 deletions(-) diffs (343 lines): diff -r b4f1f54ced6a -r 6127f0928166 tools/sr_mapping/lastz_wrapper.py --- a/tools/sr_mapping/lastz_wrapper.py Fri Jan 08 11:28:37 2010 -0500 +++ b/tools/sr_mapping/lastz_wrapper.py Fri Jan 08 16:22:44 2010 -0500 @@ -29,10 +29,9 @@ --coverage: The minimum coverage value (don't report matches covering less than this) --out_format: The format of the output file (sam, diffs, or tabular (general)) --output: The name of the output file - --num_threads: The number of threads to run --lastzSeqsFileDir: Directory of local lastz_seqs.loc file """ -import optparse, os, subprocess, shutil, sys, tempfile, threading +import optparse, os, subprocess, shutil, sys, tempfile, threading, time from Queue import Queue from galaxy import eggs @@ -40,39 +39,86 @@ pkg_resources.require( 'bx-python' ) from bx.seq.twobit import * from bx.seq.fasta import FastaReader +from galaxy.util.bunch import Bunch + +STOP_SIGNAL = object() +WORKERS = 4 +SLOTS = 128 def stop_err( msg ): sys.stderr.write( "%s" % msg ) sys.exit() -class LastzJobRunner( object ): - """ - Lastz job runner backed by a pool of "num_threads" worker threads. FIFO scheduling - """ - def __init__( self, num_threads, commands ): - """Start the job runner with "num_threads" worker threads""" - # start workers - self.queue = Queue() - for command in commands: - self.queue.put( command ) +def stop_queues( lastz, combine_data ): + # This method should only be called if an error has been encountered. + # Send STOP_SIGNAL to all worker threads + for t in lastz.threads: + lastz.put( STOP_SIGNAL, True ) + combine_data.put( STOP_SIGNAL, True ) + +class BaseQueue( object ): + def __init__( self, num_threads, slots=-1 ): + # Initialize the queue and worker threads + self.queue = Queue( slots ) self.threads = [] for i in range( num_threads ): worker = threading.Thread( target=self.run_next ) worker.start() self.threads.append( worker ) - for worker in self.threads: - worker.join() def run_next( self ): - """Run the next command, waiting until one is available if necessary""" - while not self.queue.empty(): - command = self.queue.get() - self.run_job( command ) - def run_job( self, command ): - try: - proc = subprocess.Popen( args=command, shell=True ) - proc.wait() - except Exception, e: - stop_err( "Error executing command (%s) - %s" % ( str( command ), str( e ) ) ) + # Run the next job, waiting until one is available if necessary + while True: + job = self.queue.get() + if job is STOP_SIGNAL: + return self.shutdown() + self.run_job( job ) + time.sleep( 1 ) + def run_job( self, job ): + stop_err( 'Not Implemented' ) + def put( self, job, block=False ): + # Add a job to the queue + self.queue.put( job, block ) + def shutdown( self ): + return + +class LastzJobQueue( BaseQueue ): + """ + A queue that runs commands in parallel. Blocking is done so the queue will + not consume much memory. + """ + def run_job( self, job ): + # Execute the job's command + proc = subprocess.Popen( args=job.command, shell=True, stderr=subprocess.PIPE, ) + proc.wait() + stderr = proc.stderr.read() + proc.wait() + if stderr: + stop_queues( self, job.combine_data_queue ) + stop_err( stderr ) + job.combine_data_queue.put( job ) + +class CombineDataQueue( BaseQueue ): + """ + A queue that concatenates files in serial. Blocking is not done since this + queue is not expected to grow larger than the command queue. + """ + def __init__( self, output_filename, num_threads=1 ): + BaseQueue.__init__( self, num_threads ) + self.CHUNK_SIZE = 2**20 # 1Mb + self.output_file = open( output_filename, 'wb' ) + def run_job( self, job ): + in_file = open( job.output, 'rb' ) + while True: + chunk = in_file.read( self.CHUNK_SIZE ) + if not chunk: + in_file.close() + break + self.output_file.write( chunk ) + for file_name in job.cleanup: + os.remove( file_name ) + def shutdown( self ): + self.output_file.close() + return def __main__(): #Parse Command Line @@ -101,17 +147,9 @@ parser.add_option( '', '--coverage', dest='coverage', help="The minimum coverage value (don't report matches covering less than this)" ) parser.add_option( '', '--out_format', dest='format', help='The format of the output file (sam, diffs, or tabular (general))' ) parser.add_option( '', '--output', dest='output', help='The output file' ) - parser.add_option( '', '--num_threads', dest='num_threads', help='The number of threads to run' ) parser.add_option( '', '--lastzSeqsFileDir', dest='lastzSeqsFileDir', help='Directory of local lastz_seqs.loc file' ) ( options, args ) = parser.parse_args() - # If the reference sequences are from the history, temporary input files will be created - # ( 1 for each sequence ), and we'll keep track of them for later removal from disk ( by closing them ) - tmp_in_file_names = [] - # Each thread will create a temporary file to which it writes the output from lastz - tmp_out_file_names = [] - # Execution of lastz based on job splitting - commands = [] if options.ref_name != 'None': ref_name = '[nickname=%s]' % options.ref_name else: @@ -122,9 +160,8 @@ # Prepare for user-specified options else: set_options = '--%s --%s --gapped --strand=%s --seed=%s --%s O=%s E=%s X=%s Y=%s K=%s L=%s --%s' % \ - ( options.gfextend, options.chain, options.strand, options.seed, - options.transition, options.O, options.E, options.X, - options.Y, options.K, options.L, options.entropy ) + ( options.gfextend, options.chain, options.strand, options.seed, options.transition, + options.O, options.E, options.X, options.Y, options.K, options.L, options.entropy ) # Specify input2 and add [fullnames] modifier if output format is diffs if options.format == 'diffs': input2 = '%s[fullnames]' % options.input2 @@ -141,15 +178,23 @@ else: format = options.format tabular_fields = '' + + # Set up our queues + lastz_job_queue = LastzJobQueue( WORKERS, slots=SLOTS ) + combine_data_queue = CombineDataQueue( options.output ) + if options.ref_source == 'history': - # Reference is a fasta dataset from the history, so split job across number of sequences in the dataset + # Reference is a fasta dataset from the history, so split job across + # the number of sequences in the dataset ( this could be a HUGE number ) try: # Ensure there is at least 1 sequence in the dataset ( this may not be necessary ). error_msg = "The reference dataset is missing metadata, click the pencil icon in the history item and 'auto-detect' the metadata attributes." ref_sequences = int( options.ref_sequences ) if ref_sequences < 1: + stop_queues( lastz_job_queue, combine_data_queue ) stop_err( error_msg ) except: + stop_queues( lastz_job_queue, combine_data_queue ) stop_err( error_msg ) seqs = 0 fasta_reader = FastaReader( open( options.input1 ) ) @@ -160,60 +205,61 @@ break seqs += 1 # Create a temporary file to contain the current sequence as input to lastz - tmp_in = tempfile.NamedTemporaryFile( prefix=seq.name, suffix='.fasta' ) - tmp_in_name = tmp_in.name - tmp_in.close() - tmp_in = file(tmp_in_name,'w+b') - # Keep track of our list of temporary input files so we can remove them later by closing them - tmp_in_file_names.append( tmp_in_name ) + tmp_in_fd, tmp_in_name = tempfile.mkstemp( suffix='.in' ) + tmp_in = os.fdopen( tmp_in_fd, 'wb' ) # Write the current sequence to the temporary input file tmp_in.write( '>%s\n%s\n' % ( seq.name, seq.text ) ) tmp_in.close() # Create a 2nd temporary file to contain the output from lastz execution on the current sequence - tmp_out = tempfile.NamedTemporaryFile( prefix='%s_out' % seq.name ) - tmp_out_name = tmp_out.name - tmp_out.close() - # Keep track of our list of temporary output files so we can merge them into our output dataset - tmp_out_file_names.append( tmp_out_name ) + tmp_out_fd, tmp_out_name = tempfile.mkstemp( suffix='.out' ) + os.close( tmp_out_fd ) # Generate the command line for calling lastz on the current sequence command = 'lastz %s%s %s %s --ambiguousn --nolaj --identity=%s..%s --coverage=%s --format=%s%s > %s' % \ ( tmp_in_name, ref_name, input2, set_options, options.identity_min, options.identity_max, options.coverage, format, tabular_fields, tmp_out_name ) - # Append the command line to our list of commands for sending to the LastzJobRunner queue - commands.append( command ) - # Make sure the value of sequences in the metadata is the - # same as the number of sequences read from the dataset ( this may not be necessary ). + # Create a job object + job = Bunch() + job.command = command + job.output = tmp_out_name + job.cleanup = [ tmp_in_name, tmp_out_name ] + job.combine_data_queue = combine_data_queue + # Add another job to the lastz_job_queue. Execution + # will wait at this point if the queue is full. + lastz_job_queue.put( job, block=True ) + # Make sure the value of sequences in the metadata is the same as the + # number of sequences read from the dataset ( this may not be necessary ). if ref_sequences != seqs: - stop_error( "The value of metadata.sequences (%d) differs from the number of sequences read from the reference ( %d)." % ( ref_sequences, seqs ) ) + stop_queues( lastz_job_queue, combine_data_queue ) + stop_err( "The value of metadata.sequences (%d) differs from the number of sequences read from the reference ( %d)." % ( ref_sequences, seqs ) ) else: # Reference is a locally cached 2bit file, split job across number of chroms in 2bit file tbf = TwoBitFile( open( options.input1, 'r' ) ) for chrom in tbf.keys(): # Create a temporary file to contain the output from lastz execution on the current chrom - tmp_out = tempfile.NamedTemporaryFile( prefix='%s_out' % chrom ) - tmp_out_name = tmp_out.name - tmp_out.close() - # Keep track of our list of temporary output files so we can merge them into our output dataset - tmp_out_file_names.append( tmp_out_name ) + tmp_out_fd, tmp_out_name = tempfile.mkstemp( suffix='.out' ) + os.close( tmp_out_fd ) command = 'lastz %s/%s%s %s %s --ambiguousn --nolaj --identity=%s..%s --coverage=%s --format=%s%s >> %s' % \ ( options.input1, chrom, ref_name, input2, set_options, options.identity_min, options.identity_max, options.coverage, format, tabular_fields, tmp_out_name ) - commands.append( command ) - job_runner = LastzJobRunner( int( options.num_threads ), commands ) - # Merge all of the output from lastz ( currently in temporary files ) into our output dataset - command = 'cat %s >> %s' % ( ' '.join( tmp_out_file_names ), options.output ) - proc = subprocess.Popen( args=command, shell=True ) - proc.wait() - # Remove all temporary files from disk by closing them - for name in tmp_in_file_names: - try: - os.remove( name ) - except: - pass - for name in tmp_out_file_names: - try: - os.remove( name ) - except: - pass + # Create a job object + job = Bunch() + job.command = command + job.output = tmp_out_name + job.cleanup = [ tmp_out_name ] + job.combine_data_queue = combine_data_queue + # Add another job to the lastz_job_queue. Execution + # will wait at this point if the queue is full. + lastz_job_queue.put( job, block=True ) + + # Stop the lastz_job_queue + for t in lastz_job_queue.threads: + lastz_job_queue.put( STOP_SIGNAL, True ) + # Although all jobs are submitted to the queue, we can't shut down the combine_data_queue + # until we know that all jobs have been submitted to its queue. We do this by checking + # whether all of the threads in the lastz_job_queue have terminated. + while threading.activeCount() > 2: + time.sleep( 1 ) + # Now it's safe to stop the combine_data_queue + combine_data_queue.put( STOP_SIGNAL ) if __name__=="__main__": __main__() diff -r b4f1f54ced6a -r 6127f0928166 tools/sr_mapping/lastz_wrapper.xml --- a/tools/sr_mapping/lastz_wrapper.xml Fri Jan 08 11:28:37 2010 -0500 +++ b/tools/sr_mapping/lastz_wrapper.xml Fri Jan 08 16:22:44 2010 -0500 @@ -19,7 +19,7 @@ #else: --pre_set_options="None" --strand=$params.strand --seed=$params.seed --gfextend=$params.gfextend --chain=$params.chain --transition=$params.transition --O=$params.O --E=$params.E --X=$params.X --Y=$params.Y --K=$params.K --L=$params.L --entropy=$params.entropy #end if ---identity_min=$min_ident --identity_max=$max_ident --coverage=$min_cvrg --output=$output1 --num_threads=$num_threads --lastzSeqsFileDir=${GALAXY_DATA_INDEX_DIR} +--identity_min=$min_ident --identity_max=$max_ident --coverage=$min_cvrg --output=$output1 --lastzSeqsFileDir=${GALAXY_DATA_INDEX_DIR} </command> <inputs> <param name="input2" format="fasta" type="data" label="Align sequencing reads in" /> @@ -109,10 +109,6 @@ <param name="min_ident" type="integer" size="3" value="0" label="Do not report matches below this identity (%)"/> <param name="max_ident" type="integer" size="3" value="100" label="Do not report matches above this identity (%)"/> <param name="min_cvrg" type="integer" size="3" value="0" label="Do not report matches that cover less than this percentage of each read"/> - <param name="num_threads" type="select" label="Number of threads" help="Split this job over the selected number of threads"> - <option value="4">4</option> - <option value="8">8</option> - </param> </inputs> <outputs> <data format="tabular" name="output1"> @@ -149,14 +145,15 @@ <param name="K" value="3000" /> <param name="L" value="3000" /> <param name="entropy" value="noentropy" /> - <!-- how_to_name is not the default. It is changed to modify - input1_2bit by adding the ref_name as a nickname --> + <!-- + how_to_name is not the default. It is changed to modify + input1_2bit by adding the ref_name as a nickname + --> <param name="how_to_name" value="yes" /> <param name="ref_name" value="Ref" /> <param name="min_ident" value="0" /> <param name="max_ident" value="100" /> <param name="min_cvrg" value="0" /> - <param name="num_threads" value="4" /> <output name="output1" file="lastz_wrapper_out2.sam" /> </test> <test> @@ -176,11 +173,10 @@ <param name="min_ident" value="0" /> <param name="max_ident" value="100" /> <param name="min_cvrg" value="0" /> - <param name="num_threads" value="4" /> <output name="output1" file="lastz_wrapper_out3.tabular" /> </test> - <test> - <!-- + <test> + <!-- Lastz command: first you will need to split the file phiX_split.fasta into two files, phiX1.fasta and phiX2.fasta, each with 1 sequence (phiX1 and phiX2, respectively). Then: lastz phiX1.fasta test-data/b1.fasta *yasra95short *ambiguousn *nolaj *identity=0..100 *coverage=0 *format=general-:score,name1,strand1,size1,start1,zstart1,end1,length1,text1,name2,strand2,size2,start2,zstart2,end2,start2+,zstart2+,end2+,length2,text2,diff,cigar,identity,coverage,gaprate,diagonal,shingle > lastz_wrapper_out4.tabular @@ -200,7 +196,6 @@ <param name="min_ident" value="0" /> <param name="max_ident" value="100" /> <param name="min_cvrg" value="0" /> - <param name="num_threads" value="4" /> <output name="output1" file="lastz_wrapper_out4.tabular" /> </test> </tests>