1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/3f85113b7476/ Changeset: 3f85113b7476 User: carlfeberhard Date: 2013-08-14 17:42:25 Summary: Dataproviders, QA: fix counters in FilteredDataProvider, rewrite BlockDataProvider, remove instance checks from dataset dprovs to prevent circular imports; Add unit tests for line dprovs Affected #: 5 files diff -r 009088d5e76fb00794da78dc0ee3cdaa8524b7d8 -r 3f85113b7476767ef1dc76c7f3312b423ff743db lib/galaxy/datatypes/dataproviders/base.py --- a/lib/galaxy/datatypes/dataproviders/base.py +++ b/lib/galaxy/datatypes/dataproviders/base.py @@ -247,17 +247,22 @@ `offset`, begin providing datat, and stop when `num_data_returned` is greater than `offset`. """ + if self.limit != None and self.limit <= 0: + return + yield + parent_gen = super( LimitedOffsetDataProvider, self ).__iter__() for datum in parent_gen: - - if self.limit != None and self.num_data_returned > self.limit: - break + self.num_data_returned -= 1 + #print 'self.num_data_returned:', self.num_data_returned + #print 'self.num_valid_data_read:', self.num_valid_data_read if self.num_valid_data_read > self.offset: + self.num_data_returned += 1 yield datum - else: - # wot a cheezy way of doing this... - self.num_data_returned -= 1 + + if self.limit != None and self.num_data_returned >= self.limit: + break #TODO: skipping lines is inefficient - somehow cache file position/line_num pair and allow provider # to seek to a pos/line and then begin providing lines diff -r 009088d5e76fb00794da78dc0ee3cdaa8524b7d8 -r 3f85113b7476767ef1dc76c7f3312b423ff743db lib/galaxy/datatypes/dataproviders/dataset.py --- a/lib/galaxy/datatypes/dataproviders/dataset.py +++ b/lib/galaxy/datatypes/dataproviders/dataset.py @@ -6,19 +6,13 @@ (e.g. parsing genomic regions from their source) """ +from galaxy import eggs + import pkg_resources pkg_resources.require( 'bx-python' ) from bx import seq as bx_seq from bx import wiggle as bx_wig -import galaxy.model -import galaxy.datatypes -import galaxy.datatypes.data - -#TODO: can't import these due to circular ref in model/registry -#import galaxy.datatypes.binary -#import galaxy.datatypes.tabular - import exceptions import base import line @@ -27,8 +21,9 @@ _TODO = """ use bx as much as possible -the use of DatasetInstance seems to create some import problems gff3 hierarchies + +change SamtoolsDataProvider to use pysam """ import logging @@ -50,11 +45,8 @@ """ :param dataset: the Galaxy dataset whose file will be the source :type dataset: model.DatasetInstance - - :raises exceptions.InvalidDataProviderSource: if not a DatsetInstance """ - if not isinstance( dataset, galaxy.model.DatasetInstance ): - raise exceptions.InvalidDataProviderSource( "Data provider can only be used with a DatasetInstance" ) + #precondition: dataset is a galaxy.model.DatasetInstance self.dataset = dataset # this dataset file is obviously the source #TODO: this might be a good place to interface with the object_store... @@ -615,13 +607,7 @@ """ #TODO: into validate_source - #TODO: have to import these here due to circular ref in model/datatypes - import galaxy.datatypes.binary - import galaxy.datatypes.tabular - if( not( isinstance( dataset.datatype, galaxy.datatypes.tabular.Sam ) - or isinstance( dataset.datatype, galaxy.datatypes.binary.Bam ) ) ): - raise exceptions.InvalidDataProviderSource( - 'dataset must be a Sam or Bam datatype: %s' %( str( dataset.datatype ) ) ) + #precondition: dataset.datatype is a tabular.Sam or binary.Bam self.dataset = dataset options_dict = options_dict or {} @@ -661,8 +647,9 @@ validated_flag_list = set([ flag for flag in options_string if flag in self.FLAGS_WO_ARGS ]) # if sam add -S - if( ( isinstance( self.dataset.datatype, galaxy.datatypes.tabular.Sam ) - and ( 'S' not in validated_flag_list ) ) ): + #TODO: not the best test in the world... + if( ( self.dataset.ext == 'sam' ) + and ( 'S' not in validated_flag_list ) ): validated_flag_list.append( 'S' ) if validated_flag_list: diff -r 009088d5e76fb00794da78dc0ee3cdaa8524b7d8 -r 3f85113b7476767ef1dc76c7f3312b423ff743db lib/galaxy/datatypes/dataproviders/line.py --- a/lib/galaxy/datatypes/dataproviders/line.py +++ b/lib/galaxy/datatypes/dataproviders/line.py @@ -28,18 +28,25 @@ """ DEFAULT_COMMENT_CHAR = '#' settings = { - 'string_lines' : 'bool', + 'strip_lines' : 'bool', + 'strip_newlines': 'bool', 'provide_blank' : 'bool', 'comment_char' : 'str', } - def __init__( self, source, strip_lines=True, provide_blank=False, comment_char=DEFAULT_COMMENT_CHAR, **kwargs ): + def __init__( self, source, strip_lines=True, strip_newlines=False, provide_blank=False, + comment_char=DEFAULT_COMMENT_CHAR, **kwargs ): """ :param strip_lines: remove whitespace from the beginning an ending of each line (or not). Optional: defaults to True :type strip_lines: bool + :param strip_newlines: remove newlines only + (only functions when ``strip_lines`` is false) + Optional: defaults to False + :type strip_lines: bool + :param provide_blank: are empty lines considered valid and provided? Optional: defaults to False :type provide_blank: bool @@ -51,6 +58,7 @@ """ super( FilteredLineDataProvider, self ).__init__( source, **kwargs ) self.strip_lines = strip_lines + self.strip_newlines = strip_newlines self.provide_blank = provide_blank self.comment_char = comment_char @@ -62,17 +70,18 @@ :type line: str :returns: a line or `None` """ - line = super( FilteredLineDataProvider, self ).filter( line ) if line != None: - # is this the proper order? + #??: shouldn't it strip newlines regardless, if not why not use on of the base.dprovs if self.strip_lines: line = line.strip() + elif self.strip_newlines: + line = line.strip( '\n' ) if not self.provide_blank and line == '': return None elif line.startswith( self.comment_char ): return None - return line + return super( FilteredLineDataProvider, self ).filter( line ) class RegexLineDataProvider( FilteredLineDataProvider ): @@ -108,6 +117,7 @@ #NOTE: no support for flags def filter( self, line ): + #NOTE: filter_fn will occur BEFORE any matching line = super( RegexLineDataProvider, self ).filter( line ) if line != None and self.compiled_regex_list: line = self.filter_by_regex( line ) @@ -144,16 +154,15 @@ :type block_filter_fn: function """ # composition - not inheritance - #TODO: don't pass any? - line_provider = FilteredLineDataProvider( source ) - super( BlockDataProvider, self ).__init__( line_provider, **kwargs ) + #TODO: not a fan of this: + ( filter_fn, limit, offset ) = ( kwargs.pop( 'filter_fn', None ), + kwargs.pop( 'limit', None ), kwargs.pop( 'offset', 0 ) ) + line_provider = FilteredLineDataProvider( source, **kwargs ) + super( BlockDataProvider, self ).__init__( line_provider, filter_fn=filter_fn, limit=limit, offset=offset ) self.new_block_delim_fn = new_block_delim_fn self.block_filter_fn = block_filter_fn self.init_new_block() - # ...well, this is kinda lame - but prevents returning first empty block - #TODO: maybe better way in iter - self.is_inside_block = False def init_new_block( self ): """ @@ -161,7 +170,6 @@ """ # called in __init__ and after yielding the prev. block self.block_lines = collections.deque([]) - self.block = {} def __iter__( self ): """ @@ -171,8 +179,8 @@ for block in parent_gen: yield block - last_block = self.filter_block( self.assemble_current_block() ) - if last_block != None and self.num_data_returned < self.limit: + last_block = self.handle_last_block() + if last_block != None: self.num_data_returned += 1 yield last_block @@ -186,26 +194,23 @@ :returns: a block or `None` """ line = super( BlockDataProvider, self ).filter( line ) + #HACK + self.num_data_read -= 1 if line == None: return None + block_to_return = None if self.is_new_block( line ): # if we're already in a block, return the prev. block and add the line to a new block - #TODO: maybe better way in iter - if self.is_inside_block: - filtered_block = self.filter_block( self.assemble_current_block() ) + if self.block_lines: + block_to_return = self.assemble_current_block() + block_to_return = self.filter_block( block_to_return ) + self.num_data_read += 1 + self.init_new_block() - self.add_line_to_block( line ) - - # return an assembled block datum if it passed the filter - if filtered_block != None: - return filtered_block - - else: - self.is_inside_block = True self.add_line_to_block( line ) - return None + return block_to_return def is_new_block( self, line ): """ @@ -239,7 +244,6 @@ Called per block (just before providing). """ # empty block_lines and assemble block - # NOTE: we don't want to have mem == 2*data here so - careful return list( ( self.block_lines.popleft() for i in xrange( len( self.block_lines ) ) ) ) def filter_block( self, block ): @@ -251,3 +255,20 @@ if self.block_filter_fn: return self.block_filter_fn( block ) return block + + def handle_last_block( self ): + """ + Handle any blocks remaining after the main loop. + """ + if self.limit != None and self.num_data_returned >= self.limit: + return None + + last_block = self.assemble_current_block() + self.num_data_read += 1 + + last_block = self.filter_block( last_block ) + if last_block != None: + self.num_valid_data_read += 1 + + return last_block + diff -r 009088d5e76fb00794da78dc0ee3cdaa8524b7d8 -r 3f85113b7476767ef1dc76c7f3312b423ff743db test/unit/datatypes/dataproviders/test_base_dataproviders.py --- a/test/unit/datatypes/dataproviders/test_base_dataproviders.py +++ b/test/unit/datatypes/dataproviders/test_base_dataproviders.py @@ -15,8 +15,9 @@ log = utility.set_up_filelogger( __name__ + '.log' ) -utility.add_galaxy_lib_to_path( '/test/unit/datatypes/dataproviders' ) -from galaxy.datatypes import dataproviders +utility.add_galaxy_lib_to_path( 'test/unit/datatypes/dataproviders' ) +from galaxy.datatypes.dataproviders import base, exceptions +from galaxy import eggs class BaseTestCase( unittest.TestCase ): @@ -53,9 +54,12 @@ log.debug( 'file contents:\n%s', contents ) return contents + def parses_default_content_as( self ): + return [ 'One\n', 'Two\n', 'Three\n' ] + class Test_BaseDataProvider( BaseTestCase ): - provider_class = dataproviders.base.DataProvider + provider_class = base.DataProvider def contents_provider_and_data( self, filename=None, contents=None, source=None, *provider_args, **provider_kwargs ): @@ -74,38 +78,38 @@ return ( contents, provider, data ) def test_iterators( self ): - source = ( x for x in xrange( 1, 10 ) ) + source = ( str( x ) for x in xrange( 1, 10 ) ) provider = self.provider_class( source ) data = list( provider ) log.debug( 'data: %s', str( data ) ) - self.assertEqual( data, [ x for x in xrange( 1, 10 ) ] ) + self.assertEqual( data, [ str( x ) for x in xrange( 1, 10 ) ] ) - source = [ x for x in xrange( 1, 10 ) ] + source = ( str( x ) for x in xrange( 1, 10 ) ) provider = self.provider_class( source ) data = list( provider ) log.debug( 'data: %s', str( data ) ) - self.assertEqual( data, [ x for x in xrange( 1, 10 ) ] ) + self.assertEqual( data, [ str( x ) for x in xrange( 1, 10 ) ] ) - source = ( x for x in xrange( 1, 10 ) ) + source = ( str( x ) for x in xrange( 1, 10 ) ) provider = self.provider_class( source ) data = list( provider ) log.debug( 'data: %s', str( data ) ) - self.assertEqual( data, [ x for x in xrange( 1, 10 ) ] ) + self.assertEqual( data, [ str( x ) for x in xrange( 1, 10 ) ] ) def test_validate_source( self ): """validate_source should throw an error if the source doesn't have attr '__iter__' """ def non_iterator_dprov( source ): return self.provider_class( source ) - self.assertRaises( dataproviders.exceptions.InvalidDataProviderSource, + self.assertRaises( exceptions.InvalidDataProviderSource, non_iterator_dprov, 'one two three' ) - self.assertRaises( dataproviders.exceptions.InvalidDataProviderSource, + self.assertRaises( exceptions.InvalidDataProviderSource, non_iterator_dprov, 40 ) def test_writemethods( self ): """should throw an error if any write methods are called """ - source = ( x for x in xrange( 1, 10 ) ) + source = ( str( x ) for x in xrange( 1, 10 ) ) provider = self.provider_class( source ) # should throw error def call_method( provider, method_name, *args ): @@ -118,11 +122,11 @@ def test_readlines( self ): """readlines should return all the data in list form """ - source = ( x for x in xrange( 1, 10 ) ) + source = ( str( x ) for x in xrange( 1, 10 ) ) provider = self.provider_class( source ) data = provider.readlines() log.debug( 'data: %s', str( data ) ) - self.assertEqual( data, [ x for x in xrange( 1, 10 ) ] ) + self.assertEqual( data, [ str( x ) for x in xrange( 1, 10 ) ] ) def test_stringio( self ): """should work with StringIO @@ -137,21 +141,21 @@ data = list( provider ) log.debug( 'data: %s', str( data ) ) # provider should call close on file - self.assertEqual( ''.join( data ), contents ) + self.assertEqual( data, self.parses_default_content_as() ) self.assertTrue( source.closed ) def test_file( self ): """should work with files """ ( contents, provider, data ) = self.contents_provider_and_data() - self.assertEqual( ''.join( data ), contents ) + self.assertEqual( data, self.parses_default_content_as() ) # provider should call close on file self.assertTrue( isinstance( provider.source, file ) ) self.assertTrue( provider.source.closed ) class Test_FilteredDataProvider( Test_BaseDataProvider ): - provider_class = dataproviders.base.FilteredDataProvider + provider_class = base.FilteredDataProvider def assertCounters( self, provider, read, valid, returned ): self.assertEqual( provider.num_data_read, read ) @@ -177,13 +181,13 @@ class Test_LimitedOffsetDataProvider( Test_FilteredDataProvider ): - provider_class = dataproviders.base.LimitedOffsetDataProvider + provider_class = base.LimitedOffsetDataProvider def test_offset_1( self ): """when offset is 1, should skip first """ ( contents, provider, data ) = self.contents_provider_and_data( offset=1 ) - self.assertEqual( data, [ 'Two\n', 'Three\n' ] ) + self.assertEqual( data, self.parses_default_content_as()[1:] ) self.assertCounters( provider, 3, 3, 2 ) def test_offset_all( self ): @@ -197,29 +201,28 @@ """when offset is 0, should return all """ ( contents, provider, data ) = self.contents_provider_and_data( offset=0 ) - self.assertEqual( ''.join( data ), contents ) + self.assertEqual( data, self.parses_default_content_as() ) self.assertCounters( provider, 3, 3, 3 ) def test_offset_negative( self ): """when offset is negative, should return all """ ( contents, provider, data ) = self.contents_provider_and_data( offset=-1 ) - self.assertEqual( ''.join( data ), contents ) + self.assertEqual( data, self.parses_default_content_as() ) self.assertCounters( provider, 3, 3, 3 ) def test_limit_1( self ): """when limit is one, should return first """ ( contents, provider, data ) = self.contents_provider_and_data( limit=1 ) - self.assertEqual( data, [ 'One\n' ] ) - #TODO: currently reads 2 in all counters before ending - #self.assertCounters( provider, 1, 1, 1 ) + self.assertEqual( data, self.parses_default_content_as()[:1] ) + self.assertCounters( provider, 1, 1, 1 ) def test_limit_all( self ): """when limit >= num lines, should return all """ ( contents, provider, data ) = self.contents_provider_and_data( limit=4 ) - self.assertEqual( ''.join( data ), contents ) + self.assertEqual( data, self.parses_default_content_as() ) self.assertCounters( provider, 3, 3, 3 ) def test_limit_zero( self ): @@ -227,14 +230,13 @@ """ ( contents, provider, data ) = self.contents_provider_and_data( limit=0 ) self.assertEqual( data, [] ) - #TODO: currently reads 1 before ending - self.assertCounters( provider, 3, 0, 0 ) + self.assertCounters( provider, 0, 0, 0 ) def test_limit_zero( self ): """when limit is None, should return all """ ( contents, provider, data ) = self.contents_provider_and_data( limit=None ) - self.assertEqual( ''.join( data ), contents ) + self.assertEqual( data, self.parses_default_content_as() ) self.assertCounters( provider, 3, 3, 3 ) #TODO: somehow re-use tmpfile here @@ -243,17 +245,18 @@ ( contents, provider, data ) = self.contents_provider_and_data( limit=limit, offset=offset ) self.assertEqual( data, data_should_be ) #self.assertCounters( provider, read, valid, returned ) + result_data = self.parses_default_content_as() test_data = [ ( 0, 0, [], 0, 0, 0 ), - ( 1, 0, [ 'One\n' ], 1, 1, 1 ), - ( 2, 0, [ 'One\n', 'Two\n' ], 2, 2, 2 ), - ( 3, 0, [ 'One\n', 'Two\n', 'Three\n' ], 3, 3, 3 ), - ( 1, 1, [ 'Two\n' ], 1, 1, 1 ), - ( 2, 1, [ 'Two\n', 'Three\n' ], 2, 2, 2 ), - ( 3, 1, [ 'Two\n', 'Three\n' ], 2, 2, 2 ), - ( 1, 2, [ 'Three\n' ], 1, 1, 1 ), - ( 2, 2, [ 'Three\n' ], 1, 1, 1 ), - ( 3, 2, [ 'Three\n' ], 1, 1, 1 ), + ( 1, 0, self.parses_default_content_as()[:1], 1, 1, 1 ), + ( 2, 0, self.parses_default_content_as()[:2], 2, 2, 2 ), + ( 3, 0, self.parses_default_content_as()[:3], 3, 3, 3 ), + ( 1, 1, self.parses_default_content_as()[1:2], 1, 1, 1 ), + ( 2, 1, self.parses_default_content_as()[1:3], 2, 2, 2 ), + ( 3, 1, self.parses_default_content_as()[1:3], 2, 2, 2 ), + ( 1, 2, self.parses_default_content_as()[2:3], 1, 1, 1 ), + ( 2, 2, self.parses_default_content_as()[2:3], 1, 1, 1 ), + ( 3, 2, self.parses_default_content_as()[2:3], 1, 1, 1 ), ] for test in test_data: log.debug( 'limit_offset_combo: %s', ', '.join([ str( e ) for e in test ]) ) @@ -269,14 +272,15 @@ limit=limit, offset=offset, filter_fn=only_ts ) self.assertEqual( data, data_should_be ) #self.assertCounters( provider, read, valid, returned ) + result_data = [ c for c in self.parses_default_content_as() if c.lower().startswith( 't' ) ] test_data = [ ( 0, 0, [], 0, 0, 0 ), - ( 1, 0, [ 'Two\n' ], 1, 1, 1 ), - ( 2, 0, [ 'Two\n', 'Three\n' ], 2, 2, 2 ), - ( 3, 0, [ 'Two\n', 'Three\n' ], 2, 2, 2 ), - ( 1, 1, [ 'Three\n' ], 1, 1, 1 ), - ( 2, 1, [ 'Three\n' ], 1, 1, 1 ), - ( 1, 2, [], 0, 0, 0 ), + ( 1, 0, result_data[:1], 1, 1, 1 ), + ( 2, 0, result_data[:2], 2, 2, 2 ), + ( 3, 0, result_data[:3], 2, 2, 2 ), + ( 1, 1, result_data[1:2], 1, 1, 1 ), + ( 2, 1, result_data[1:3], 1, 1, 1 ), + ( 1, 2, result_data[2:3], 0, 0, 0 ), ] for test in test_data: log.debug( 'limit_offset_combo: %s', ', '.join([ str( e ) for e in test ]) ) @@ -284,7 +288,7 @@ class Test_MultiSourceDataProvider( BaseTestCase ): - provider_class = dataproviders.base.MultiSourceDataProvider + provider_class = base.MultiSourceDataProvider def contents_and_tmpfile( self, contents=None ): #TODO: hmmmm... @@ -355,9 +359,9 @@ def no_youtube( string ): return None if ( 'youtu.be' in string ) else string source_list = [ - dataproviders.base.LimitedOffsetDataProvider( source_list[0], filter_fn=no_Fs, limit=2, offset=1 ), - dataproviders.base.LimitedOffsetDataProvider( source_list[1], limit=1, offset=3 ), - dataproviders.base.FilteredDataProvider( source_list[2], filter_fn=no_youtube ), + base.LimitedOffsetDataProvider( source_list[0], filter_fn=no_Fs, limit=2, offset=1 ), + base.LimitedOffsetDataProvider( source_list[1], limit=1, offset=3 ), + base.FilteredDataProvider( source_list[2], filter_fn=no_youtube ), ] provider = self.provider_class( source_list ) log.debug( 'provider: %s', provider ) diff -r 009088d5e76fb00794da78dc0ee3cdaa8524b7d8 -r 3f85113b7476767ef1dc76c7f3312b423ff743db test/unit/datatypes/dataproviders/test_line_dataproviders.py --- /dev/null +++ b/test/unit/datatypes/dataproviders/test_line_dataproviders.py @@ -0,0 +1,298 @@ +""" +Unit tests for base DataProviders. +.. seealso:: galaxy.datatypes.dataproviders.base +""" +# currently because of dataproviders.dataset importing galaxy.model this doesn't work +#TODO: fix imports there after dist and retry + +#TODO: fix off by ones in FilteredDataProvider counters + +import unittest +import StringIO + +import tempfilecache +import utility + +import test_base_dataproviders + +log = utility.set_up_filelogger( __name__ + '.log' ) + +utility.add_galaxy_lib_to_path( 'test/unit/datatypes/dataproviders' ) +from galaxy import eggs +from galaxy.datatypes.dataproviders import line + +_TODO = """ +TestCase hierarchy is a bit of mess here. +""" + + +class Test_FilteredLineDataProvider( test_base_dataproviders.Test_FilteredDataProvider ): + provider_class = line.FilteredLineDataProvider + default_file_contents = """ + # this should be stripped out + One + # as should blank lines + + # preceding/trailing whitespace too + Two + Three + """ + + def parses_default_content_as( self ): + return [ 'One', 'Two', 'Three' ] + + def test_counters( self ): + """should count: lines read, lines that passed the filter, lines returned + """ + ( contents, provider, data ) = self.contents_provider_and_data() + self.assertCounters( provider, 7, 3, 3 ) + + def test_filter_fn( self ): + """should filter out lines using filter_fn and set counters properly + based on filter + """ + def filter_ts( string ): + if string.lower().startswith( 't' ): + return None + return string + ( contents, provider, data ) = self.contents_provider_and_data( filter_fn=filter_ts ) + self.assertCounters( provider, 7, 1, 1 ) + + def test_limit_with_offset( self ): + def limit_offset_combo( limit, offset, data_should_be, read, valid, returned ): + ( contents, provider, data ) = self.contents_provider_and_data( limit=limit, offset=offset ) + self.assertEqual( data, data_should_be ) + #self.assertCounters( provider, read, valid, returned ) + result_data = self.parses_default_content_as() + test_data = [ + ( 0, 0, [], 0, 0, 0 ), + ( 1, 0, self.parses_default_content_as()[:1], 1, 1, 1 ), + ( 2, 0, self.parses_default_content_as()[:2], 2, 2, 2 ), + ( 3, 0, self.parses_default_content_as()[:3], 3, 3, 3 ), + ( 1, 1, self.parses_default_content_as()[1:2], 1, 1, 1 ), + ( 2, 1, self.parses_default_content_as()[1:3], 2, 2, 2 ), + ( 3, 1, self.parses_default_content_as()[1:3], 2, 2, 2 ), + ( 1, 2, self.parses_default_content_as()[2:3], 1, 1, 1 ), + ( 2, 2, self.parses_default_content_as()[2:3], 1, 1, 1 ), + ( 3, 2, self.parses_default_content_as()[2:3], 1, 1, 1 ), + ] + for test in test_data: + log.debug( 'limit_offset_combo: %s', ', '.join([ str( e ) for e in test ]) ) + limit_offset_combo( *test ) + + def test_provide_blank( self ): + """should return blank lines if ``provide_blank`` is true. + """ + ( contents, provider, data ) = self.contents_provider_and_data( provide_blank=True ) + self.assertEqual( data, [ 'One', '', 'Two', 'Three' ] ) + self.assertCounters( provider, 7, 4, 4 ) + + def test_strip_lines( self ): + """should return unstripped lines if ``strip_lines`` is false. + """ + ( contents, provider, data ) = self.contents_provider_and_data( strip_lines=False ) + self.assertEqual( data, ['One\n', '\n', ' Two\n', 'Three\n'] ) + self.assertCounters( provider, 7, 4, 4 ) + + def test_comment_char( self ): + """should return unstripped lines if ``strip_lines`` is false. + """ + ( contents, provider, data ) = self.contents_provider_and_data( comment_char='T' ) + self.assertEqual( data, [ '# this should be stripped out', 'One', + '# as should blank lines', '# preceding/trailing whitespace too' ] ) + self.assertCounters( provider, 7, 4, 4 ) + + +class Test_RegexLineDataProvider( Test_FilteredLineDataProvider ): + provider_class = line.RegexLineDataProvider + default_file_contents = """ + # this should be stripped out + One + # as should blank lines + + # preceding/trailing whitespace too + Two + Three + """ + + def test_regex( self ): + """should return lines matching regex (AFTER strip, comments, blanks). + """ + ( contents, provider, data ) = self.contents_provider_and_data( regex_list=[ r'^O' ] ) + self.assertEqual( data, [ 'One' ] ) + self.assertCounters( provider, 7, 1, 1 ) + + def test_regex_list( self ): + """should return regex matches using more than one regex by ORing them. + """ + ( contents, provider, data ) = self.contents_provider_and_data( regex_list=[ r'^O', r'T' ] ) + self.assertEqual( data, [ 'One', 'Two', 'Three' ] ) + self.assertCounters( provider, 7, 3, 3 ) + + def test_inverse( self ): + """should return inverse matches when ``invert`` is true. + """ + ( contents, provider, data ) = self.contents_provider_and_data( regex_list=[ r'^O' ], invert=True ) + self.assertEqual( data, [ 'Two', 'Three' ] ) + self.assertCounters( provider, 7, 2, 2 ) + + def test_regex_no_match( self ): + """should return empty if no regex matches. + """ + ( contents, provider, data ) = self.contents_provider_and_data( regex_list=[ r'^Z' ] ) + self.assertEqual( data, [] ) + self.assertCounters( provider, 7, 0, 0 ) + + def test_regex_w_limit_offset( self ): + """regex should play well with limit and offset + """ + ( contents, provider, data ) = self.contents_provider_and_data( regex_list=[ r'^T' ], limit=1 ) + self.assertEqual( data, [ 'Two' ] ) + #TODO: once again, valid data, returned data is off + self.assertCounters( provider, 6, 1, 1 ) + + ( contents, provider, data ) = self.contents_provider_and_data( regex_list=[ r'^T' ], limit=1, offset=1 ) + self.assertEqual( data, [ 'Three' ] ) + self.assertCounters( provider, 7, 2, 1 ) + + +class Test_BlockDataProvider( test_base_dataproviders.Test_FilteredDataProvider ): + provider_class = line.BlockDataProvider + default_file_contents = """ + One + ABCD + Two + ABCD + EFGH + Three + """ + + def parses_default_content_as( self ): + return [ ['One'], ['ABCD'], ['Two'], ['ABCD'], ['EFGH'], ['Three'] ] + + #TODO: well, this is ham-handed... + def test_stringio( self ): pass + def test_iterators( self ): pass + def test_readlines( self ): pass + + def test_file( self ): + """should work with files + """ + ( contents, provider, data ) = self.contents_provider_and_data() + self.assertEqual( data, self.parses_default_content_as() ) + self.assertTrue( isinstance( provider.source, line.FilteredLineDataProvider ) ) + self.assertTrue( isinstance( provider.source.source, file ) ) + # provider should call close on file + self.assertTrue( provider.source.source.closed ) + + def test_counters( self ): + """should count: lines read, lines that passed the filter, lines returned + """ + ( contents, provider, data ) = self.contents_provider_and_data() + self.assertCounters( provider, 6, 6, 6 ) + + def test_filter_fn( self ): + """should filter out lines using filter_fn and set counters properly + based on filter + """ + def filter_ts( string ): + if string.lower().startswith( 't' ): + return None + return string + ( contents, provider, data ) = self.contents_provider_and_data( filter_fn=filter_ts ) + # no block fns here, so will parse as lines + self.assertEqual( data, [ ['One'], ['ABCD'], ['ABCD'], ['EFGH'] ] ) + self.assertCounters( provider, 4, 4, 4 ) + + def test_new_block_delim_fn( self ): + """should return blocks based on ``new_block_delim_fn`` + """ + def is_not_indented( line ): + strip_diff = len( line ) - len( line.lstrip() ) + return ( strip_diff == 0 ) + # in order to use indentation as a delimiter, we need to strip the newlines only + ( contents, provider, data ) = self.contents_provider_and_data( strip_lines=False, strip_newlines=True, + new_block_delim_fn=is_not_indented ) + self.assertEqual( data, [[ 'One', ' ABCD' ], [ 'Two', ' ABCD', ' EFGH' ], [ 'Three' ]] ) + self.assertCounters( provider, 3, 3, 3 ) + + def test_block_filter_fn( self ): + """should return blocks only blocks that pass ``block_filter_fn`` + """ + def is_not_indented( line ): + strip_diff = len( line ) - len( line.lstrip() ) + return ( strip_diff == 0 ) + #def empty_block( block ): + # if len( block ) <= 1: + # return None + # return block + def no_tw( block ): + if block[0].startswith( 'Tw' ): + return None + return block + ( contents, provider, data ) = self.contents_provider_and_data( strip_lines=False, strip_newlines=True, + new_block_delim_fn=is_not_indented, block_filter_fn=no_tw ) + self.assertEqual( data, [[ 'One', ' ABCD' ], [ 'Three' ]] ) + self.assertCounters( provider, 3, 2, 2 ) + + def test_hack_block_filter_fn( self ): + """should allow other aggregating/mod use in filter_fn + + Although, it would be better to subclass and override assemble_current_block + """ + def is_not_indented( line ): + strip_diff = len( line ) - len( line.lstrip() ) + return ( strip_diff == 0 ) + def empty_block( block ): + if len( block ) <= 1: + return None + return { 'header': block[0].strip(), 'data': [ b.strip() for b in block[1:] if b.strip() ] } + ( contents, provider, data ) = self.contents_provider_and_data( + strip_lines=False, strip_newlines=True, + new_block_delim_fn=is_not_indented, block_filter_fn=empty_block ) + self.assertEqual( data, [ { 'header': 'One', 'data': [ 'ABCD' ]}, + { 'header': 'Two', 'data': [ 'ABCD', 'EFGH' ]} ]) + self.assertCounters( provider, 3, 2, 2 ) + + def test_block_filter_fn_w_limit_offset( self ): + """should allow both block fns and limit, offset + """ + def is_not_indented( line ): + strip_diff = len( line ) - len( line.lstrip() ) + return ( strip_diff == 0 ) + def empty_block( block ): + if len( block ) <= 1: + return None + return block + ( contents, provider, data ) = self.contents_provider_and_data( strip_lines=False, strip_newlines=True, + new_block_delim_fn=is_not_indented, block_filter_fn=empty_block, limit=1 ) + self.assertEqual( data, [[ 'One', ' ABCD' ]] ) + self.assertCounters( provider, 1, 1, 1 ) + ( contents, provider, data ) = self.contents_provider_and_data( strip_lines=False, strip_newlines=True, + new_block_delim_fn=is_not_indented, block_filter_fn=empty_block, limit=2, offset=1 ) + self.assertEqual( data, [[ 'Two', ' ABCD', ' EFGH' ]] ) + self.assertCounters( provider, 3, 2, 1 ) + + def test_simple_example( self ): + """ + """ + file_contents = """ + >One + ABCD + + # this comment (and the blank line above) won't be included + >Two + ABCD + EFGH + """ + def fasta_header( line ): + return line.startswith( '>' ) + def id_seq( block ): + return { 'id': block[0][1:], 'seq': ( ''.join( block[1:] ) ) } + ( contents, provider, data ) = self.contents_provider_and_data( contents=file_contents, + new_block_delim_fn=fasta_header, block_filter_fn=id_seq ) + self.assertEqual( data, [{ 'id': 'One', 'seq': 'ABCD' }, { 'id': 'Two', 'seq': 'ABCDEFGH' }] ) + self.assertCounters( provider, 2, 2, 2 ) + +if __name__ == '__main__': + unittest.main() Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.