commit/galaxy-central: 10 new changesets
10 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/changeset/876baf3452a8/ changeset: 876baf3452a8 branch: split_blast user: peterjc date: 2012-02-16 13:14:54 summary: Enable splitting BLAST jobs up (doesn't work yet) affected #: 5 files diff -r 5ac62b79d6926ad0a9db8810dedf9fe1a1ed41fa -r 876baf3452a8ace8ac58deff41a695a6248794cb tools/ncbi_blast_plus/ncbi_blastn_wrapper.xml --- a/tools/ncbi_blast_plus/ncbi_blastn_wrapper.xml +++ b/tools/ncbi_blast_plus/ncbi_blastn_wrapper.xml @@ -1,5 +1,6 @@ <tool id="ncbi_blastn_wrapper" name="NCBI BLAST+ blastn" version="0.0.11"><description>Search nucleotide database with nucleotide query sequence(s)</description> + <parallelism method="multi" split_inputs="query" shared_inputs="subject" merge_outputs="output1"></parallelism><version_command>blastn -version</version_command><command interpreter="python">hide_stderr.py ## The command is a Cheetah template which allows some Python based syntax. diff -r 5ac62b79d6926ad0a9db8810dedf9fe1a1ed41fa -r 876baf3452a8ace8ac58deff41a695a6248794cb tools/ncbi_blast_plus/ncbi_blastp_wrapper.xml --- a/tools/ncbi_blast_plus/ncbi_blastp_wrapper.xml +++ b/tools/ncbi_blast_plus/ncbi_blastp_wrapper.xml @@ -1,5 +1,6 @@ <tool id="ncbi_blastp_wrapper" name="NCBI BLAST+ blastp" version="0.0.11"><description>Search protein database with protein query sequence(s)</description> + <parallelism method="multi" split_inputs="query" shared_inputs="subject" merge_outputs="output1"></parallelism><version_command>blastp -version</version_command><command interpreter="python">hide_stderr.py ## The command is a Cheetah template which allows some Python based syntax. diff -r 5ac62b79d6926ad0a9db8810dedf9fe1a1ed41fa -r 876baf3452a8ace8ac58deff41a695a6248794cb tools/ncbi_blast_plus/ncbi_blastx_wrapper.xml --- a/tools/ncbi_blast_plus/ncbi_blastx_wrapper.xml +++ b/tools/ncbi_blast_plus/ncbi_blastx_wrapper.xml @@ -1,5 +1,6 @@ <tool id="ncbi_blastx_wrapper" name="NCBI BLAST+ blastx" version="0.0.11"><description>Search protein database with translated nucleotide query sequence(s)</description> + <parallelism method="multi" split_inputs="query" shared_inputs="subject" merge_outputs="output1"></parallelism><version_command>blastx -version</version_command><command interpreter="python">hide_stderr.py ## The command is a Cheetah template which allows some Python based syntax. diff -r 5ac62b79d6926ad0a9db8810dedf9fe1a1ed41fa -r 876baf3452a8ace8ac58deff41a695a6248794cb tools/ncbi_blast_plus/ncbi_tblastn_wrapper.xml --- a/tools/ncbi_blast_plus/ncbi_tblastn_wrapper.xml +++ b/tools/ncbi_blast_plus/ncbi_tblastn_wrapper.xml @@ -1,5 +1,6 @@ <tool id="ncbi_tblastn_wrapper" name="NCBI BLAST+ tblastn" version="0.0.11"><description>Search translated nucleotide database with protein query sequence(s)</description> + <parallelism method="multi" split_inputs="query" shared_inputs="subject" merge_outputs="output1"></parallelism><version_command>tblastn -version</version_command><command interpreter="python">hide_stderr.py ## The command is a Cheetah template which allows some Python based syntax. diff -r 5ac62b79d6926ad0a9db8810dedf9fe1a1ed41fa -r 876baf3452a8ace8ac58deff41a695a6248794cb tools/ncbi_blast_plus/ncbi_tblastx_wrapper.xml --- a/tools/ncbi_blast_plus/ncbi_tblastx_wrapper.xml +++ b/tools/ncbi_blast_plus/ncbi_tblastx_wrapper.xml @@ -1,5 +1,6 @@ <tool id="ncbi_tblastx_wrapper" name="NCBI BLAST+ tblastx" version="0.0.11"><description>Search translated nucleotide database with translated nucleotide query sequence(s)</description> + <parallelism method="multi" split_inputs="query" shared_inputs="subject" merge_outputs="output1"></parallelism><version_command>tblastx -version</version_command><command interpreter="python">hide_stderr.py ## The command is a Cheetah template which allows some Python based syntax. https://bitbucket.org/galaxy/galaxy-central/changeset/762777618073/ changeset: 762777618073 branch: split_blast user: peterjc date: 2012-02-16 13:15:48 summary: Move FASTQ splitting from Sequence class to Fastq class affected #: 1 file diff -r 876baf3452a8ace8ac58deff41a695a6248794cb -r 76277761807306ec2be3f1e4059dd7cde6fd2dc6 lib/galaxy/datatypes/sequence.py --- a/lib/galaxy/datatypes/sequence.py +++ b/lib/galaxy/datatypes/sequence.py @@ -190,143 +190,10 @@ write_split_files = classmethod(write_split_files) def split( cls, input_datasets, subdir_generator_function, split_params): - """ - FASTQ files are split on cluster boundaries, in increments of 4 lines - """ + """Split a generic sequence file (not sensible or possible, see subclasses).""" if split_params is None: return None - - # first, see if there are any associated FQTOC files that will give us the split locations - # if so, we don't need to read the files to do the splitting - toc_file_datasets = [] - for ds in input_datasets: - tmp_ds = ds - fqtoc_file = None - while fqtoc_file is None and tmp_ds is not None: - fqtoc_file = tmp_ds.get_converted_files_by_type('fqtoc') - tmp_ds = tmp_ds.copied_from_library_dataset_dataset_association - - if fqtoc_file is not None: - toc_file_datasets.append(fqtoc_file) - - if len(toc_file_datasets) == len(input_datasets): - return cls.do_fast_split(input_datasets, toc_file_datasets, subdir_generator_function, split_params) - return cls.do_slow_split(input_datasets, subdir_generator_function, split_params) - split = classmethod(split) - - def process_split_file(data): - """ - This is called in the context of an external process launched by a Task (possibly not on the Galaxy machine) - to create the input files for the Task. The parameters: - data - a dict containing the contents of the split file - """ - args = data['args'] - input_name = data['input_name'] - output_name = data['output_name'] - start_sequence = long(args['start_sequence']) - sequence_count = long(args['num_sequences']) - - if 'toc_file' in args: - toc_file = simplejson.load(open(args['toc_file'], 'r')) - commands = Sequence.get_split_commands_with_toc(input_name, output_name, toc_file, start_sequence, sequence_count) - else: - commands = Sequence.get_split_commands_sequential(is_gzip(input_name), input_name, output_name, start_sequence, sequence_count) - for cmd in commands: - if 0 != os.system(cmd): - raise Exception("Executing '%s' failed" % cmd) - return True - process_split_file = staticmethod(process_split_file) - - def get_split_commands_with_toc(input_name, output_name, toc_file, start_sequence, sequence_count): - """ - Uses a Table of Contents dict, parsed from an FQTOC file, to come up with a set of - shell commands that will extract the parts necessary - >>> three_sections=[dict(start=0, end=74, sequences=10), dict(start=74, end=148, sequences=10), dict(start=148, end=148+76, sequences=10)] - >>> Sequence.get_split_commands_with_toc('./input.gz', './output.gz', dict(sections=three_sections), start_sequence=0, sequence_count=10) - ['dd bs=1 skip=0 count=74 if=./input.gz 2> /dev/null >> ./output.gz'] - >>> Sequence.get_split_commands_with_toc('./input.gz', './output.gz', dict(sections=three_sections), start_sequence=1, sequence_count=5) - ['(dd bs=1 skip=0 count=74 if=./input.gz 2> /dev/null )| zcat | ( tail -n +5 2> /dev/null) | head -20 | gzip -c >> ./output.gz'] - >>> Sequence.get_split_commands_with_toc('./input.gz', './output.gz', dict(sections=three_sections), start_sequence=0, sequence_count=20) - ['dd bs=1 skip=0 count=148 if=./input.gz 2> /dev/null >> ./output.gz'] - >>> Sequence.get_split_commands_with_toc('./input.gz', './output.gz', dict(sections=three_sections), start_sequence=5, sequence_count=10) - ['(dd bs=1 skip=0 count=74 if=./input.gz 2> /dev/null )| zcat | ( tail -n +21 2> /dev/null) | head -20 | gzip -c >> ./output.gz', '(dd bs=1 skip=74 count=74 if=./input.gz 2> /dev/null )| zcat | ( tail -n +1 2> /dev/null) | head -20 | gzip -c >> ./output.gz'] - >>> Sequence.get_split_commands_with_toc('./input.gz', './output.gz', dict(sections=three_sections), start_sequence=10, sequence_count=10) - ['dd bs=1 skip=74 count=74 if=./input.gz 2> /dev/null >> ./output.gz'] - >>> Sequence.get_split_commands_with_toc('./input.gz', './output.gz', dict(sections=three_sections), start_sequence=5, sequence_count=20) - ['(dd bs=1 skip=0 count=74 if=./input.gz 2> /dev/null )| zcat | ( tail -n +21 2> /dev/null) | head -20 | gzip -c >> ./output.gz', 'dd bs=1 skip=74 count=74 if=./input.gz 2> /dev/null >> ./output.gz', '(dd bs=1 skip=148 count=76 if=./input.gz 2> /dev/null )| zcat | ( tail -n +1 2> /dev/null) | head -20 | gzip -c >> ./output.gz'] - """ - sections = toc_file['sections'] - result = [] - - current_sequence = long(0) - i=0 - # skip to the section that contains my starting sequence - while i < len(sections) and start_sequence >= current_sequence + long(sections[i]['sequences']): - current_sequence += long(sections[i]['sequences']) - i += 1 - if i == len(sections): # bad input data! - raise Exception('No FQTOC section contains starting sequence %s' % start_sequence) - - # These two variables act as an accumulator for consecutive entire blocks that - # can be copied verbatim (without decompressing) - start_chunk = long(-1) - end_chunk = long(-1) - copy_chunk_cmd = 'dd bs=1 skip=%s count=%s if=%s 2> /dev/null >> %s' - - while sequence_count > 0 and i < len(sections): - # we need to extract partial data. So, find the byte offsets of the chunks that contain the data we need - # use a combination of dd (to pull just the right sections out) tail (to skip lines) and head (to get the - # right number of lines - sequences = long(sections[i]['sequences']) - skip_sequences = start_sequence-current_sequence - sequences_to_extract = min(sequence_count, sequences-skip_sequences) - start_copy = long(sections[i]['start']) - end_copy = long(sections[i]['end']) - if sequences_to_extract < sequences: - if start_chunk > -1: - result.append(copy_chunk_cmd % (start_chunk, end_chunk-start_chunk, input_name, output_name)) - start_chunk = -1 - # extract, unzip, trim, recompress - result.append('(dd bs=1 skip=%s count=%s if=%s 2> /dev/null )| zcat | ( tail -n +%s 2> /dev/null) | head -%s | gzip -c >> %s' % - (start_copy, end_copy-start_copy, input_name, skip_sequences*4+1, sequences_to_extract*4, output_name)) - else: # whole section - add it to the start_chunk/end_chunk accumulator - if start_chunk == -1: - start_chunk = start_copy - end_chunk = end_copy - sequence_count -= sequences_to_extract - start_sequence += sequences_to_extract - current_sequence += sequences - i += 1 - if start_chunk > -1: - result.append(copy_chunk_cmd % (start_chunk, end_chunk-start_chunk, input_name, output_name)) - - if sequence_count > 0: - raise Exception('%s sequences not found in file' % sequence_count) - - return result - get_split_commands_with_toc = staticmethod(get_split_commands_with_toc) - - - def get_split_commands_sequential(is_compressed, input_name, output_name, start_sequence, sequence_count): - """ - Does a brain-dead sequential scan & extract of certain sequences - >>> Sequence.get_split_commands_sequential(True, './input.gz', './output.gz', start_sequence=0, sequence_count=10) - ['zcat "./input.gz" | ( tail -n +1 2> /dev/null) | head -40 | gzip -c > "./output.gz"'] - >>> Sequence.get_split_commands_sequential(False, './input.fastq', './output.fastq', start_sequence=10, sequence_count=10) - ['tail -n +41 "./input.fastq" 2> /dev/null | head -40 > "./output.fastq"'] - """ - start_line = start_sequence * 4 - line_count = sequence_count * 4 - # TODO: verify that tail can handle 64-bit numbers - if is_compressed: - cmd = 'zcat "%s" | ( tail -n +%s 2> /dev/null) | head -%s | gzip -c' % (input_name, start_line+1, line_count) - else: - cmd = 'tail -n +%s "%s" 2> /dev/null | head -%s' % (start_line+1, input_name, line_count) - cmd += ' > "%s"' % output_name - - return [cmd] - get_split_commands_sequential = staticmethod(get_split_commands_sequential) - + raise NotImplementedError("Can't split generic sequence files") class Alignment( data.Text ): @@ -335,6 +202,13 @@ """Add metadata elements""" MetadataElement( name="species", desc="Species", default=[], param=metadata.SelectParameter, multiple=True, readonly=True, no_value=None ) + def split( cls, input_datasets, subdir_generator_function, split_params): + """Split a generic alignment file (not sensible or possible, see subclasses).""" + if split_params is None: + return None + raise NotImplementedError("Can't split generic alignment files") + + class Fasta( Sequence ): """Class representing a FASTA sequence""" file_ext = "fasta" @@ -502,6 +376,55 @@ except: return False + def split( cls, input_datasets, subdir_generator_function, split_params): + """ + FASTQ files are split on cluster boundaries, in increments of 4 lines + """ + if split_params is None: + return None + + # first, see if there are any associated FQTOC files that will give us the split locations + # if so, we don't need to read the files to do the splitting + toc_file_datasets = [] + for ds in input_datasets: + tmp_ds = ds + fqtoc_file = None + while fqtoc_file is None and tmp_ds is not None: + fqtoc_file = tmp_ds.get_converted_files_by_type('fqtoc') + tmp_ds = tmp_ds.copied_from_library_dataset_dataset_association + + if fqtoc_file is not None: + toc_file_datasets.append(fqtoc_file) + + if len(toc_file_datasets) == len(input_datasets): + return cls.do_fast_split(input_datasets, toc_file_datasets, subdir_generator_function, split_params) + return cls.do_slow_split(input_datasets, subdir_generator_function, split_params) + split = classmethod(split) + + def process_split_file(data): + """ + This is called in the context of an external process launched by a Task (possibly not on the Galaxy machine) + to create the input files for the Task. The parameters: + data - a dict containing the contents of the split file + """ + args = data['args'] + input_name = data['input_name'] + output_name = data['output_name'] + start_sequence = long(args['start_sequence']) + sequence_count = long(args['num_sequences']) + + if 'toc_file' in args: + toc_file = simplejson.load(open(args['toc_file'], 'r')) + commands = Sequence.get_split_commands_with_toc(input_name, output_name, toc_file, start_sequence, sequence_count) + else: + commands = Sequence.get_split_commands_sequential(is_gzip(input_name), input_name, output_name, start_sequence, sequence_count) + for cmd in commands: + if 0 != os.system(cmd): + raise Exception("Executing '%s' failed" % cmd) + return True + process_split_file = staticmethod(process_split_file) + + class FastqSanger( Fastq ): """Class representing a FASTQ sequence ( the Sanger variant )""" file_ext = "fastqsanger" https://bitbucket.org/galaxy/galaxy-central/changeset/ebe94a2c25c3/ changeset: ebe94a2c25c3 branch: split_blast user: peterjc date: 2012-02-16 17:22:58 summary: Not all datatype splitters write a JSON file affected #: 1 file diff -r 76277761807306ec2be3f1e4059dd7cde6fd2dc6 -r ebe94a2c25c365cce3058aa963717a088627a526 scripts/extract_dataset_part.py --- a/scripts/extract_dataset_part.py +++ b/scripts/extract_dataset_part.py @@ -31,6 +31,9 @@ Argument: a JSON file """ file_path = sys.argv.pop( 1 ) + if not os.path.isfile(file_path): + #Nothing to do - some splitters don't write a JSON file + sys.exit(0) data = simplejson.load(open(file_path, 'r')) try: class_name_parts = data['class_name'].split('.') https://bitbucket.org/galaxy/galaxy-central/changeset/416c961c0da9/ changeset: 416c961c0da9 branch: split_blast user: peterjc date: 2012-02-16 19:20:29 summary: Simple FASTA splitting (no JSON metadata files) affected #: 1 file diff -r ebe94a2c25c365cce3058aa963717a088627a526 -r 416c961c0da95ec92bcf47a3272bdb278c42d7c6 lib/galaxy/datatypes/sequence.py --- a/lib/galaxy/datatypes/sequence.py +++ b/lib/galaxy/datatypes/sequence.py @@ -261,6 +261,66 @@ pass return False + def split(cls, input_datasets, subdir_generator_function, split_params): + """Split a FASTA file sequence by sequence.""" + if split_params is None: + return + if len(input_datasets) > 1: + raise Exception("FASTA file splitting does not support multiple files") + input_file = input_datasets[0].file_name + + #Counting chunk size as number of sequences. + if 'split_mode' not in split_params: + raise Exception('Tool does not define a split mode') + elif split_params['split_mode'] == 'number_of_parts': + #if split_mode = number_of_parts, and split_size = 10, then + #we count the number of sequences (say 1234) and divide by + #by ten, giving ten files of approx 123 sequences each. + chunk_size = 123 + elif split_params['split_mode'] == 'to_size': + #Split the input file into as many sub-files as required, + #each containing to_size many sequences + chunk_size = int(split_params['split_size']) + else: + raise Exception('Unsupported split mode %s' % split_params['split_mode']) + + log.debug("Attemping to split FASTA file %s into chunks of %i sequences" \ + % (input_file, chunk_size)) + f = open(input_file, "rU") + part_file = None + try: + #Note if the input FASTA file has no sequences, we will + #produce just one sub-file which will be a copy of it. + part_dir = subdir_generator_function() + part_path = os.path.join(part_dir, os.path.basename(input_file)) + part_file = open(part_path, 'w') + log.debug("Writing %s part to %s" % (input_file, part_path)) + rec_count = 0 + while True: + line = f.readline() + if not line: + break + if line[0]==">": + rec_count += 1 + if rec_count > chunk_size: + #Start a new sub-file + part_file.close() + part_dir = subdir_generator_function() + part_path = os.path.join(part_dir, os.path.basename(input_file)) + part_file = open(part_path, 'w') + log.debug("Writing %s part to %s" % (input_file, part_path)) + rec_count = 1 + part_file.write(line) + part_file.close() + except Exception, e: + log.error('Unable to split FASTA file: %s' % str(e)) + f.close() + if part_file is not None: + part_file.close() + raise + f.close() + split = classmethod(split) + class csFasta( Sequence ): """ Class representing the SOLID Color-Space sequence ( csfasta ) """ file_ext = "csfasta" https://bitbucket.org/galaxy/galaxy-central/changeset/44c2446e05f0/ changeset: 44c2446e05f0 branch: split_blast user: peterjc date: 2012-02-16 19:21:32 summary: Use FASTA splitting in BLAST wrappers affected #: 5 files diff -r 416c961c0da95ec92bcf47a3272bdb278c42d7c6 -r 44c2446e05f01665fa393caec44193004857d7b7 tools/ncbi_blast_plus/ncbi_blastn_wrapper.xml --- a/tools/ncbi_blast_plus/ncbi_blastn_wrapper.xml +++ b/tools/ncbi_blast_plus/ncbi_blastn_wrapper.xml @@ -1,6 +1,7 @@ <tool id="ncbi_blastn_wrapper" name="NCBI BLAST+ blastn" version="0.0.11"><description>Search nucleotide database with nucleotide query sequence(s)</description> - <parallelism method="multi" split_inputs="query" shared_inputs="subject" merge_outputs="output1"></parallelism> + <!-- If job splitting is enabled, break up the query file into batches of 500 sequences --> + <parallelism method="multi" split_inputs="query" split_mode="to_size" split_size="500" shared_inputs="subject" merge_outputs="output1"></parallelism><version_command>blastn -version</version_command><command interpreter="python">hide_stderr.py ## The command is a Cheetah template which allows some Python based syntax. diff -r 416c961c0da95ec92bcf47a3272bdb278c42d7c6 -r 44c2446e05f01665fa393caec44193004857d7b7 tools/ncbi_blast_plus/ncbi_blastp_wrapper.xml --- a/tools/ncbi_blast_plus/ncbi_blastp_wrapper.xml +++ b/tools/ncbi_blast_plus/ncbi_blastp_wrapper.xml @@ -1,6 +1,7 @@ <tool id="ncbi_blastp_wrapper" name="NCBI BLAST+ blastp" version="0.0.11"><description>Search protein database with protein query sequence(s)</description> - <parallelism method="multi" split_inputs="query" shared_inputs="subject" merge_outputs="output1"></parallelism> + <!-- If job splitting is enabled, break up the query file into batches of 500 sequences --> + <parallelism method="multi" split_inputs="query" split_mode="to_size" split_size="500" shared_inputs="subject" merge_outputs="output1"></parallelism><version_command>blastp -version</version_command><command interpreter="python">hide_stderr.py ## The command is a Cheetah template which allows some Python based syntax. diff -r 416c961c0da95ec92bcf47a3272bdb278c42d7c6 -r 44c2446e05f01665fa393caec44193004857d7b7 tools/ncbi_blast_plus/ncbi_blastx_wrapper.xml --- a/tools/ncbi_blast_plus/ncbi_blastx_wrapper.xml +++ b/tools/ncbi_blast_plus/ncbi_blastx_wrapper.xml @@ -1,6 +1,7 @@ <tool id="ncbi_blastx_wrapper" name="NCBI BLAST+ blastx" version="0.0.11"><description>Search protein database with translated nucleotide query sequence(s)</description> - <parallelism method="multi" split_inputs="query" shared_inputs="subject" merge_outputs="output1"></parallelism> + <!-- If job splitting is enabled, break up the query file into batches of 500 sequences --> + <parallelism method="multi" split_inputs="query" split_mode="to_size" split_size="500" shared_inputs="subject" merge_outputs="output1"></parallelism><version_command>blastx -version</version_command><command interpreter="python">hide_stderr.py ## The command is a Cheetah template which allows some Python based syntax. diff -r 416c961c0da95ec92bcf47a3272bdb278c42d7c6 -r 44c2446e05f01665fa393caec44193004857d7b7 tools/ncbi_blast_plus/ncbi_tblastn_wrapper.xml --- a/tools/ncbi_blast_plus/ncbi_tblastn_wrapper.xml +++ b/tools/ncbi_blast_plus/ncbi_tblastn_wrapper.xml @@ -1,6 +1,7 @@ <tool id="ncbi_tblastn_wrapper" name="NCBI BLAST+ tblastn" version="0.0.11"><description>Search translated nucleotide database with protein query sequence(s)</description> - <parallelism method="multi" split_inputs="query" shared_inputs="subject" merge_outputs="output1"></parallelism> + <!-- If job splitting is enabled, break up the query file into batches of 500 sequences --> + <parallelism method="multi" split_inputs="query" split_mode="to_size" split_size="500" shared_inputs="subject" merge_outputs="output1"></parallelism><version_command>tblastn -version</version_command><command interpreter="python">hide_stderr.py ## The command is a Cheetah template which allows some Python based syntax. diff -r 416c961c0da95ec92bcf47a3272bdb278c42d7c6 -r 44c2446e05f01665fa393caec44193004857d7b7 tools/ncbi_blast_plus/ncbi_tblastx_wrapper.xml --- a/tools/ncbi_blast_plus/ncbi_tblastx_wrapper.xml +++ b/tools/ncbi_blast_plus/ncbi_tblastx_wrapper.xml @@ -1,6 +1,7 @@ <tool id="ncbi_tblastx_wrapper" name="NCBI BLAST+ tblastx" version="0.0.11"><description>Search translated nucleotide database with translated nucleotide query sequence(s)</description> - <parallelism method="multi" split_inputs="query" shared_inputs="subject" merge_outputs="output1"></parallelism> + <!-- If job splitting is enabled, break up the query file into batches of 500 sequences --> + <parallelism method="multi" split_inputs="query" split_mode="to_size" split_size="500" shared_inputs="subject" merge_outputs="output1"></parallelism><version_command>tblastx -version</version_command><command interpreter="python">hide_stderr.py ## The command is a Cheetah template which allows some Python based syntax. https://bitbucket.org/galaxy/galaxy-central/changeset/26a0c0aa776d/ changeset: 26a0c0aa776d branch: split_blast user: peterjc date: 2012-02-17 13:05:15 summary: Size based FASTA splitting affected #: 1 file diff -r 44c2446e05f01665fa393caec44193004857d7b7 -r 26a0c0aa776d5ab557263e3704c2cb6e2fce7a30 lib/galaxy/datatypes/sequence.py --- a/lib/galaxy/datatypes/sequence.py +++ b/lib/galaxy/datatypes/sequence.py @@ -262,7 +262,14 @@ return False def split(cls, input_datasets, subdir_generator_function, split_params): - """Split a FASTA file sequence by sequence.""" + """Split a FASTA file sequence by sequence. + + Note that even if split_mode="number_of_parts", the actual number of + sub-files produced may not match that requested by split_size. + + If split_mode="to_size" then split_size is treated as the number of + FASTA records to put in each sub-file (not size in bytes). + """ if split_params is None: return if len(input_datasets) > 1: @@ -273,17 +280,76 @@ if 'split_mode' not in split_params: raise Exception('Tool does not define a split mode') elif split_params['split_mode'] == 'number_of_parts': - #if split_mode = number_of_parts, and split_size = 10, then - #we count the number of sequences (say 1234) and divide by + split_size = int(split_params['split_size']) + log.debug("Split %s into %i parts..." % (input_file, split_size)) + #if split_mode = number_of_parts, and split_size = 10, and + #we know the number of sequences (say 1234), then divide by #by ten, giving ten files of approx 123 sequences each. - chunk_size = 123 + if input_datasets[0].metadata is not None \ + and input_datasets[0].metadata.sequences: + #Galaxy has already counted/estimated the number + batch_size = 1 + input_datasets[0].metadata.sequences // split_size + cls._count_split(input_file, batch_size, subdir_generator_function) + else: + #OK, if Galaxy hasn't counted them, it may be a big file. + #We're not going to count the records which would be slow + #and a waste of disk IO time - instead we'll split using + #the file size. + chunk_size = os.path.getsize(input_file) // split_size + cls._size_split(input_file, chunk_size, subdir_generator_function) elif split_params['split_mode'] == 'to_size': #Split the input file into as many sub-files as required, #each containing to_size many sequences - chunk_size = int(split_params['split_size']) + batch_size = int(split_params['split_size']) + log.debug("Split %s into batches of %i records..." % (input_file, batch_size)) + cls._count_split(input_file, batch_size, subdir_generator_function) else: raise Exception('Unsupported split mode %s' % split_params['split_mode']) + split = classmethod(split) + def _size_split(cls, input_file, chunk_size, subdir_generator_function): + """Split a FASTA file into chunks based on size on disk. + + This does of course preserve complete records - it only splits at the + start of a new FASTQ sequence record. + """ + log.debug("Attemping to split FASTA file %s into chunks of %i bytes" \ + % (input_file, chunk_size)) + f = open(input_file, "rU") + part_file = None + try: + #Note if the input FASTA file has no sequences, we will + #produce just one sub-file which will be a copy of it. + part_dir = subdir_generator_function() + part_path = os.path.join(part_dir, os.path.basename(input_file)) + part_file = open(part_path, 'w') + log.debug("Writing %s part to %s" % (input_file, part_path)) + start_offset = 0 + while True: + offset = f.tell() + line = f.readline() + if not line: + break + if line[0]==">" and offset - start_offset >= chunk_size: + #Start a new sub-file + part_file.close() + part_dir = subdir_generator_function() + part_path = os.path.join(part_dir, os.path.basename(input_file)) + part_file = open(part_path, 'w') + log.debug("Writing %s part to %s" % (input_file, part_path)) + start_offset = f.tell() + part_file.write(line) + except Exception, e: + log.error('Unable to size split FASTA file: %s' % str(e)) + f.close() + if part_file is not None: + part_file.close() + raise + f.close() + _size_split = classmethod(_size_split) + + def _count_split(cls, input_file, chunk_size, subdir_generator_function): + """Split a FASTA file into chunks based on counting records.""" log.debug("Attemping to split FASTA file %s into chunks of %i sequences" \ % (input_file, chunk_size)) f = open(input_file, "rU") @@ -313,13 +379,13 @@ part_file.write(line) part_file.close() except Exception, e: - log.error('Unable to split FASTA file: %s' % str(e)) + log.error('Unable to count split FASTA file: %s' % str(e)) f.close() if part_file is not None: part_file.close() raise f.close() - split = classmethod(split) + _count_split = classmethod(_count_split) class csFasta( Sequence ): """ Class representing the SOLID Color-Space sequence ( csfasta ) """ https://bitbucket.org/galaxy/galaxy-central/changeset/1fb89ae798be/ changeset: 1fb89ae798be branch: split_blast user: peterjc date: 2012-02-17 13:24:01 summary: BLAST wrappers: Split FASTA query into parts affected #: 5 files diff -r 26a0c0aa776d5ab557263e3704c2cb6e2fce7a30 -r 1fb89ae798bee0a08d4bb7b3ea08be87b4410531 tools/ncbi_blast_plus/ncbi_blastn_wrapper.xml --- a/tools/ncbi_blast_plus/ncbi_blastn_wrapper.xml +++ b/tools/ncbi_blast_plus/ncbi_blastn_wrapper.xml @@ -1,7 +1,7 @@ <tool id="ncbi_blastn_wrapper" name="NCBI BLAST+ blastn" version="0.0.11"><description>Search nucleotide database with nucleotide query sequence(s)</description><!-- If job splitting is enabled, break up the query file into batches of 500 sequences --> - <parallelism method="multi" split_inputs="query" split_mode="to_size" split_size="500" shared_inputs="subject" merge_outputs="output1"></parallelism> + <parallelism method="multi" split_inputs="query" split_mode="number_of_parts" split_size="4" shared_inputs="subject" merge_outputs="output1"></parallelism><version_command>blastn -version</version_command><command interpreter="python">hide_stderr.py ## The command is a Cheetah template which allows some Python based syntax. diff -r 26a0c0aa776d5ab557263e3704c2cb6e2fce7a30 -r 1fb89ae798bee0a08d4bb7b3ea08be87b4410531 tools/ncbi_blast_plus/ncbi_blastp_wrapper.xml --- a/tools/ncbi_blast_plus/ncbi_blastp_wrapper.xml +++ b/tools/ncbi_blast_plus/ncbi_blastp_wrapper.xml @@ -1,7 +1,7 @@ <tool id="ncbi_blastp_wrapper" name="NCBI BLAST+ blastp" version="0.0.11"><description>Search protein database with protein query sequence(s)</description><!-- If job splitting is enabled, break up the query file into batches of 500 sequences --> - <parallelism method="multi" split_inputs="query" split_mode="to_size" split_size="500" shared_inputs="subject" merge_outputs="output1"></parallelism> + <parallelism method="multi" split_inputs="query" split_mode="number_of_parts" split_size="4" shared_inputs="subject" merge_outputs="output1"></parallelism><version_command>blastp -version</version_command><command interpreter="python">hide_stderr.py ## The command is a Cheetah template which allows some Python based syntax. diff -r 26a0c0aa776d5ab557263e3704c2cb6e2fce7a30 -r 1fb89ae798bee0a08d4bb7b3ea08be87b4410531 tools/ncbi_blast_plus/ncbi_blastx_wrapper.xml --- a/tools/ncbi_blast_plus/ncbi_blastx_wrapper.xml +++ b/tools/ncbi_blast_plus/ncbi_blastx_wrapper.xml @@ -1,7 +1,7 @@ <tool id="ncbi_blastx_wrapper" name="NCBI BLAST+ blastx" version="0.0.11"><description>Search protein database with translated nucleotide query sequence(s)</description><!-- If job splitting is enabled, break up the query file into batches of 500 sequences --> - <parallelism method="multi" split_inputs="query" split_mode="to_size" split_size="500" shared_inputs="subject" merge_outputs="output1"></parallelism> + <parallelism method="multi" split_inputs="query" split_mode="number_of_parts" split_size="4" shared_inputs="subject" merge_outputs="output1"></parallelism><version_command>blastx -version</version_command><command interpreter="python">hide_stderr.py ## The command is a Cheetah template which allows some Python based syntax. diff -r 26a0c0aa776d5ab557263e3704c2cb6e2fce7a30 -r 1fb89ae798bee0a08d4bb7b3ea08be87b4410531 tools/ncbi_blast_plus/ncbi_tblastn_wrapper.xml --- a/tools/ncbi_blast_plus/ncbi_tblastn_wrapper.xml +++ b/tools/ncbi_blast_plus/ncbi_tblastn_wrapper.xml @@ -1,7 +1,7 @@ <tool id="ncbi_tblastn_wrapper" name="NCBI BLAST+ tblastn" version="0.0.11"><description>Search translated nucleotide database with protein query sequence(s)</description><!-- If job splitting is enabled, break up the query file into batches of 500 sequences --> - <parallelism method="multi" split_inputs="query" split_mode="to_size" split_size="500" shared_inputs="subject" merge_outputs="output1"></parallelism> + <parallelism method="multi" split_inputs="query" split_mode="number_of_parts" split_size="4" shared_inputs="subject" merge_outputs="output1"></parallelism><version_command>tblastn -version</version_command><command interpreter="python">hide_stderr.py ## The command is a Cheetah template which allows some Python based syntax. diff -r 26a0c0aa776d5ab557263e3704c2cb6e2fce7a30 -r 1fb89ae798bee0a08d4bb7b3ea08be87b4410531 tools/ncbi_blast_plus/ncbi_tblastx_wrapper.xml --- a/tools/ncbi_blast_plus/ncbi_tblastx_wrapper.xml +++ b/tools/ncbi_blast_plus/ncbi_tblastx_wrapper.xml @@ -1,7 +1,7 @@ <tool id="ncbi_tblastx_wrapper" name="NCBI BLAST+ tblastx" version="0.0.11"><description>Search translated nucleotide database with translated nucleotide query sequence(s)</description><!-- If job splitting is enabled, break up the query file into batches of 500 sequences --> - <parallelism method="multi" split_inputs="query" split_mode="to_size" split_size="500" shared_inputs="subject" merge_outputs="output1"></parallelism> + <parallelism method="multi" split_inputs="query" split_mode="number_of_parts" split_size="4" shared_inputs="subject" merge_outputs="output1"></parallelism><version_command>tblastx -version</version_command><command interpreter="python">hide_stderr.py ## The command is a Cheetah template which allows some Python based syntax. https://bitbucket.org/galaxy/galaxy-central/changeset/c2f30968b034/ changeset: c2f30968b034 branch: split_blast user: peterjc date: 2012-02-17 16:27:19 summary: Merged stdout/stderr was missing newline between tasks affected #: 1 file diff -r 1fb89ae798bee0a08d4bb7b3ea08be87b4410531 -r c2f30968b0343decb3dbc985d2fc948df489e453 lib/galaxy/jobs/splitters/multi.py --- a/lib/galaxy/jobs/splitters/multi.py +++ b/lib/galaxy/jobs/splitters/multi.py @@ -152,8 +152,8 @@ out = tw.get_task().stdout.strip() err = tw.get_task().stderr.strip() if len(out) > 0: - stdout += tw.working_directory + ':\n' + out + stdout += "\n" + tw.working_directory + ':\n' + out if len(err) > 0: - stderr += tw.working_directory + ':\n' + err + stderr += "\n" + tw.working_directory + ':\n' + err return (stdout, stderr) - + https://bitbucket.org/galaxy/galaxy-central/changeset/2d74c0a4e931/ changeset: 2d74c0a4e931 branch: split_blast user: peterjc date: 2012-02-17 17:03:24 summary: Explicit failure for merging XML files affected #: 1 file diff -r c2f30968b0343decb3dbc985d2fc948df489e453 -r 2d74c0a4e9314826efb02458980cc2d5e58678e7 lib/galaxy/datatypes/xml.py --- a/lib/galaxy/datatypes/xml.py +++ b/lib/galaxy/datatypes/xml.py @@ -42,6 +42,13 @@ #TODO - Is there a more robust way to do this? return line.startswith('<?xml ') + def merge(split_files, output_file): + """Merging multiple XML files is non-trivial and must be done in subclasses.""" + if len(split_files) > 1: + raise NotImplementedError("Merging multiple XML files is non-trivial and must be implemented for each XML type") + #For one file only, use base class method (move/copy) + data.Text.merge(split_files, output_file) + merge = staticmethod(merge) class BlastXml( GenericXml ): """NCBI Blast XML Output data""" https://bitbucket.org/galaxy/galaxy-central/changeset/ebf65c0b1e26/ changeset: ebf65c0b1e26 branch: split_blast user: peterjc date: 2012-02-22 19:43:23 summary: Basic BLAST XML merge implementation affected #: 1 file diff -r 2d74c0a4e9314826efb02458980cc2d5e58678e7 -r ebf65c0b1e26a17d5d78f70ec19eccfc800d06fd lib/galaxy/datatypes/xml.py --- a/lib/galaxy/datatypes/xml.py +++ b/lib/galaxy/datatypes/xml.py @@ -93,7 +93,51 @@ return False handle.close() return True - + + def merge(split_files, output_file): + """Merging multiple XML files is non-trivial and must be done in subclasses.""" + if len(split_files) == 1: + #For one file only, use base class method (move/copy) + return data.Text.merge(split_files, output_file) + out = open(output_file, "w") + h = None + for f in split_files: + h = open(f) + body = False + header = [] + while True: + line = h.readline() + header.append(line) + if "<Iteration>" in line: + break + header = "".join(header) + if "<BlastOutput>" not in header: + out.close() + h.close() + raise ValueError("%s is not a BLAST XML file:\n%s\n..." % (f, header)) + if f == split_files[0]: + out.write(header) + old_header = header + elif old_header[:300] != header[:300]: + #Enough to check <BlastOutput_program> and <BlastOutput_version> match + out.close() + h.close() + raise ValueError("BLAST XML headers don't match for %s and %s - have:\n%s\n...\n\nAnd:\n%s\n...\n" \ + % (split_files[0], f, old_header[:300], header[:300])) + else: + out.write(" <Iteration>\n") + for line in h: + if "</BlastOutput_iterations>" in line: + break + #TODO - Increment <Iteration_iter-num> and if required automatic query names + #like <Iteration_query-ID>Query_3</Iteration_query-ID> to be increasing? + out.write(line) + h.close() + out.write(" </BlastOutput_iterations>\n") + out.write("</BlastOutput>\n") + out.close() + merge = staticmethod(merge) + class MEMEXml( GenericXml ): """MEME XML Output data""" Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
Bitbucket