1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/edef166c163f/ Changeset: edef166c163f User: carlfeberhard Date: 2013-07-31 20:07:01 Summary: Unit tests: add base dataprovider unit tests Affected #: 6 files diff -r 4b5f8a81adf16369d828aa82fe82f8141772c1d3 -r edef166c163f30175e98ddb0b98f7993d7d4d5eb .hgignore --- a/.hgignore +++ b/.hgignore @@ -85,6 +85,7 @@ .coverage htmlcov run_unit_tests.html +test/unit/**.log # Project files *.kpf diff -r 4b5f8a81adf16369d828aa82fe82f8141772c1d3 -r edef166c163f30175e98ddb0b98f7993d7d4d5eb test/unit/datatypes/dataproviders/tempfilecache.py --- /dev/null +++ b/test/unit/datatypes/dataproviders/tempfilecache.py @@ -0,0 +1,47 @@ + +import os +import tempfile + +import logging +logging.getLogger( __name__ ) +log = logging + +class TempFileCache( object ): + """ + Creates and caches tempfiles with/based-on the given contents. + """ + def __init__( self, logger=None ): + if logger: + global log + log = logger + super( TempFileCache, self ).__init__() + self.clear() + + def clear( self ): + self.delete_tmpfiles() + self._content_dict = {} + + def create_tmpfile( self, contents ): + if not hasattr( self, '_content_dict' ): + self.set_up_tmpfiles() + + if contents not in self._content_dict: + # create a named tmp and write contents to it, return filename + tmpfile = tempfile.NamedTemporaryFile( delete=False ) + tmpfile.write( contents ) + tmpfile.close() + log.debug( 'created tmpfile.name: %s', tmpfile.name ) + self._content_dict[ contents ] = tmpfile.name + + else: + log.debug( '(cached): %s', self._content_dict[ contents ] ) + return self._content_dict[ contents ] + + def delete_tmpfiles( self ): + if not hasattr( self, '_content_dict' ) or not self._content_dict: + return + for tmpfile_contents in self._content_dict: + tmpfile = self._content_dict[ tmpfile_contents ] + if os.path.exists( tmpfile ): + log.debug( 'unlinking tmpfile: %s', tmpfile ) + os.unlink( tmpfile ) diff -r 4b5f8a81adf16369d828aa82fe82f8141772c1d3 -r edef166c163f30175e98ddb0b98f7993d7d4d5eb test/unit/datatypes/dataproviders/test_base_dataproviders.py --- /dev/null +++ b/test/unit/datatypes/dataproviders/test_base_dataproviders.py @@ -0,0 +1,370 @@ +""" +Unit tests for base DataProviders. +.. seealso:: galaxy.datatypes.dataproviders.base +""" +# currently because of dataproviders.dataset importing galaxy.model this doesn't work +#TODO: fix imports there after dist and retry + +#TODO: fix off by ones in FilteredDataProvider counters + +import unittest +import StringIO + +import tempfilecache +import utility + +log = utility.set_up_filelogger( __name__ + '.log' ) + +utility.add_galaxy_lib_to_path( '/test/unit/datatypes/dataproviders' ) +from galaxy.datatypes import dataproviders + + +class BaseTestCase( unittest.TestCase ): + default_file_contents = """ + One + Two + Three + """ + + @classmethod + def setUpClass( cls ): + log.debug( 'CLASS %s %s', ( '_' * 40 ), cls.__name__ ) + + @classmethod + def tearDownClass( cls ): + log.debug( 'CLASS %s %s\n\n', ( '_' * 40 ), cls.__name__ ) + + def __init__( self, *args ): + unittest.TestCase.__init__( self, *args ) + self.tmpfiles = tempfilecache.TempFileCache( log ) + + def setUp( self ): + log.debug( 'BEGIN %s %s', ( '.' * 40 ), self._testMethodName ) + if self._testMethodDoc: + log.debug( ' """%s"""', self._testMethodDoc.strip() ) + + def tearDown( self ): + self.tmpfiles.clear() + log.debug( 'END\n' ) + + def format_tmpfile_contents( self, contents=None ): + contents = contents or self.default_file_contents + contents = utility.clean_multiline_string( contents ) + log.debug( 'file contents:\n%s', contents ) + return contents + + +class Test_BaseDataProvider( BaseTestCase ): + provider_class = dataproviders.base.DataProvider + + def contents_provider_and_data( self, + filename=None, contents=None, source=None, *provider_args, **provider_kwargs ): + # to remove boiler plate + # returns file content string, provider used, and data list + if not filename: + contents = self.format_tmpfile_contents( contents ) + filename = self.tmpfiles.create_tmpfile( contents ) + #TODO: if filename, contents == None + if not source: + source = open( filename ) + provider = self.provider_class( source, *provider_args, **provider_kwargs ) + log.debug( 'provider: %s', provider ) + data = list( provider ) + log.debug( 'data: %s', str( data ) ) + return ( contents, provider, data ) + + def test_iterators( self ): + source = ( x for x in xrange( 1, 10 ) ) + provider = self.provider_class( source ) + data = list( provider ) + log.debug( 'data: %s', str( data ) ) + self.assertEqual( data, [ x for x in xrange( 1, 10 ) ] ) + + source = [ x for x in xrange( 1, 10 ) ] + provider = self.provider_class( source ) + data = list( provider ) + log.debug( 'data: %s', str( data ) ) + self.assertEqual( data, [ x for x in xrange( 1, 10 ) ] ) + + source = ( x for x in xrange( 1, 10 ) ) + provider = self.provider_class( source ) + data = list( provider ) + log.debug( 'data: %s', str( data ) ) + self.assertEqual( data, [ x for x in xrange( 1, 10 ) ] ) + + def test_validate_source( self ): + """validate_source should throw an error if the source doesn't have attr '__iter__' + """ + def non_iterator_dprov( source ): + return self.provider_class( source ) + self.assertRaises( dataproviders.exceptions.InvalidDataProviderSource, + non_iterator_dprov, 'one two three' ) + self.assertRaises( dataproviders.exceptions.InvalidDataProviderSource, + non_iterator_dprov, 40 ) + + def test_writemethods( self ): + """should throw an error if any write methods are called + """ + source = ( x for x in xrange( 1, 10 ) ) + provider = self.provider_class( source ) + # should throw error + def call_method( provider, method_name, *args ): + method = getattr( provider, method_name ) + return method( *args ) + self.assertRaises( NotImplementedError, call_method, provider, 'truncate', 20 ) + self.assertRaises( NotImplementedError, call_method, provider, 'write', 'bler' ) + self.assertRaises( NotImplementedError, call_method, provider, 'writelines', [ 'one', 'two' ] ) + + def test_readlines( self ): + """readlines should return all the data in list form + """ + source = ( x for x in xrange( 1, 10 ) ) + provider = self.provider_class( source ) + data = provider.readlines() + log.debug( 'data: %s', str( data ) ) + self.assertEqual( data, [ x for x in xrange( 1, 10 ) ] ) + + def test_stringio( self ): + """should work with StringIO + """ + contents = utility.clean_multiline_string( """ + One + Two + Three + """ ) + source = StringIO.StringIO( contents ) + provider = self.provider_class( source ) + data = list( provider ) + log.debug( 'data: %s', str( data ) ) + # provider should call close on file + self.assertEqual( ''.join( data ), contents ) + self.assertTrue( source.closed ) + + def test_file( self ): + """should work with files + """ + ( contents, provider, data ) = self.contents_provider_and_data() + self.assertEqual( ''.join( data ), contents ) + # provider should call close on file + self.assertTrue( isinstance( provider.source, file ) ) + self.assertTrue( provider.source.closed ) + + +class Test_FilteredDataProvider( Test_BaseDataProvider ): + provider_class = dataproviders.base.FilteredDataProvider + + def assertCounters( self, provider, read, valid, returned ): + self.assertEqual( provider.num_data_read, read ) + self.assertEqual( provider.num_valid_data_read, valid ) + self.assertEqual( provider.num_data_returned, returned ) + + def test_counters( self ): + """should count: lines read, lines that passed the filter, lines returned + """ + ( contents, provider, data ) = self.contents_provider_and_data() + self.assertCounters( provider, 3, 3, 3 ) + + def test_filter_fn( self ): + """should filter out lines using filter_fn and set counters properly + based on filter + """ + def filter_ts( string ): + if string.lower().startswith( 't' ): + return None + return string + ( contents, provider, data ) = self.contents_provider_and_data( filter_fn=filter_ts ) + self.assertCounters( provider, 3, 1, 1 ) + + +class Test_LimitedOffsetDataProvider( Test_FilteredDataProvider ): + provider_class = dataproviders.base.LimitedOffsetDataProvider + + def test_offset_1( self ): + """when offset is 1, should skip first + """ + ( contents, provider, data ) = self.contents_provider_and_data( offset=1 ) + self.assertEqual( data, [ 'Two\n', 'Three\n' ] ) + self.assertCounters( provider, 3, 3, 2 ) + + def test_offset_all( self ): + """when offset >= num lines, should return empty list + """ + ( contents, provider, data ) = self.contents_provider_and_data( offset=4 ) + self.assertEqual( data, [] ) + self.assertCounters( provider, 3, 3, 0 ) + + def test_offset_none( self ): + """when offset is 0, should return all + """ + ( contents, provider, data ) = self.contents_provider_and_data( offset=0 ) + self.assertEqual( ''.join( data ), contents ) + self.assertCounters( provider, 3, 3, 3 ) + + def test_offset_negative( self ): + """when offset is negative, should return all + """ + ( contents, provider, data ) = self.contents_provider_and_data( offset=-1 ) + self.assertEqual( ''.join( data ), contents ) + self.assertCounters( provider, 3, 3, 3 ) + + def test_limit_1( self ): + """when limit is one, should return first + """ + ( contents, provider, data ) = self.contents_provider_and_data( limit=1 ) + self.assertEqual( data, [ 'One\n' ] ) + #TODO: currently reads 2 in all counters before ending + #self.assertCounters( provider, 1, 1, 1 ) + + def test_limit_all( self ): + """when limit >= num lines, should return all + """ + ( contents, provider, data ) = self.contents_provider_and_data( limit=4 ) + self.assertEqual( ''.join( data ), contents ) + self.assertCounters( provider, 3, 3, 3 ) + + def test_limit_zero( self ): + """when limit >= num lines, should return empty list + """ + ( contents, provider, data ) = self.contents_provider_and_data( limit=0 ) + self.assertEqual( data, [] ) + #TODO: currently reads 1 before ending + self.assertCounters( provider, 3, 0, 0 ) + + def test_limit_zero( self ): + """when limit is None, should return all + """ + ( contents, provider, data ) = self.contents_provider_and_data( limit=None ) + self.assertEqual( ''.join( data ), contents ) + self.assertCounters( provider, 3, 3, 3 ) + + #TODO: somehow re-use tmpfile here + def test_limit_with_offset( self ): + def limit_offset_combo( limit, offset, data_should_be, read, valid, returned ): + ( contents, provider, data ) = self.contents_provider_and_data( limit=limit, offset=offset ) + self.assertEqual( data, data_should_be ) + #self.assertCounters( provider, read, valid, returned ) + test_data = [ + ( 0, 0, [], 0, 0, 0 ), + ( 1, 0, [ 'One\n' ], 1, 1, 1 ), + ( 2, 0, [ 'One\n', 'Two\n' ], 2, 2, 2 ), + ( 3, 0, [ 'One\n', 'Two\n', 'Three\n' ], 3, 3, 3 ), + ( 1, 1, [ 'Two\n' ], 1, 1, 1 ), + ( 2, 1, [ 'Two\n', 'Three\n' ], 2, 2, 2 ), + ( 3, 1, [ 'Two\n', 'Three\n' ], 2, 2, 2 ), + ( 1, 2, [ 'Three\n' ], 1, 1, 1 ), + ( 2, 2, [ 'Three\n' ], 1, 1, 1 ), + ( 3, 2, [ 'Three\n' ], 1, 1, 1 ), + ] + for test in test_data: + log.debug( 'limit_offset_combo: %s', ', '.join([ str( e ) for e in test ]) ) + limit_offset_combo( *test ) + + def test_limit_with_offset_and_filter( self ): + def limit_offset_combo( limit, offset, data_should_be, read, valid, returned ): + def only_ts( string ): + if not string.lower().startswith( 't' ): + return None + return string + ( contents, provider, data ) = self.contents_provider_and_data( + limit=limit, offset=offset, filter_fn=only_ts ) + self.assertEqual( data, data_should_be ) + #self.assertCounters( provider, read, valid, returned ) + test_data = [ + ( 0, 0, [], 0, 0, 0 ), + ( 1, 0, [ 'Two\n' ], 1, 1, 1 ), + ( 2, 0, [ 'Two\n', 'Three\n' ], 2, 2, 2 ), + ( 3, 0, [ 'Two\n', 'Three\n' ], 2, 2, 2 ), + ( 1, 1, [ 'Three\n' ], 1, 1, 1 ), + ( 2, 1, [ 'Three\n' ], 1, 1, 1 ), + ( 1, 2, [], 0, 0, 0 ), + ] + for test in test_data: + log.debug( 'limit_offset_combo: %s', ', '.join([ str( e ) for e in test ]) ) + limit_offset_combo( *test ) + + +class Test_MultiSourceDataProvider( BaseTestCase ): + provider_class = dataproviders.base.MultiSourceDataProvider + + def contents_and_tmpfile( self, contents=None ): + #TODO: hmmmm... + contents = contents or self.default_file_contents + contents = utility.clean_multiline_string( contents ) + return ( contents, self.tmpfiles.create_tmpfile( contents ) ) + + def test_multiple_sources( self ): + # clean the following contents, write them to tmpfiles, open them, + # and pass as a list to the provider + contents = [ + """ + One + Two + Three + Four + Five + """, + """ + Six + Seven + Eight + Nine + Ten + """, + """ + Eleven + Twelve! (<-- http://youtu.be/JZshZp-cxKg) + """ + ] + contents = [ utility.clean_multiline_string( c ) for c in contents ] + source_list = [ open( self.tmpfiles.create_tmpfile( c ) ) for c in contents ] + + provider = self.provider_class( source_list ) + log.debug( 'provider: %s', provider ) + data = list( provider ) + log.debug( 'data: %s', str( data ) ) + self.assertEqual( ''.join( data ), ''.join( contents) ) + + def test_multiple_compound_sources( self ): + # clean the following contents, write them to tmpfiles, open them, + # and pass as a list to the provider + contents = [ + """ + One + Two + Three + Four + Five + """, + """ + Six + Seven + Eight + Nine + Ten + """, + """ + Eleven + Twelve! (<-- http://youtu.be/JZshZp-cxKg) + """ + ] + contents = [ utility.clean_multiline_string( c ) for c in contents ] + source_list = [ open( self.tmpfiles.create_tmpfile( c ) ) for c in contents ] + + def no_Fs( string ): + return None if string.startswith( 'F' ) else string + def no_youtube( string ): + return None if ( 'youtu.be' in string ) else string + source_list = [ + dataproviders.base.LimitedOffsetDataProvider( source_list[0], filter_fn=no_Fs, limit=2, offset=1 ), + dataproviders.base.LimitedOffsetDataProvider( source_list[1], limit=1, offset=3 ), + dataproviders.base.FilteredDataProvider( source_list[2], filter_fn=no_youtube ), + ] + provider = self.provider_class( source_list ) + log.debug( 'provider: %s', provider ) + data = list( provider ) + log.debug( 'data: %s', str( data ) ) + self.assertEqual( ''.join( data ), 'Two\nThree\nNine\nEleven\n' ) + + +if __name__ == '__main__': + unittest.main() diff -r 4b5f8a81adf16369d828aa82fe82f8141772c1d3 -r edef166c163f30175e98ddb0b98f7993d7d4d5eb test/unit/datatypes/dataproviders/utility.py --- /dev/null +++ b/test/unit/datatypes/dataproviders/utility.py @@ -0,0 +1,45 @@ +""" +Unit test utilities. +""" + +import os +import sys +import logging +import textwrap + +def set_up_filelogger( logname, level=logging.DEBUG ): + """ + Sets up logging to a file named `logname` + (removing it first if it already exists). + + Usable with 'nosetests' to get logging msgs from failed tests + (no logfile created). + Usable with 'nosetests --nologcapture' to get logging msgs for all tests + (in logfile). + """ + if os.path.exists( logname ): os.unlink( logname ) + logging.basicConfig( filename=logname, level=logging.DEBUG ) + return logging + +def add_galaxy_lib_to_path( this_dir_relative_to_root ): + """ + Adds `<galaxy>/lib` to `sys.path` given the scripts directory relative + to `<galaxy>`. + .. example:: + utility.add_galaxy_lib_to_path( '/test/unit/datatypes/dataproviders' ) + """ + glx_lib = os.path.join( os.getcwd().replace( this_dir_relative_to_root, '' ), 'lib' ) + sys.path.append( glx_lib ) + +def clean_multiline_string( multiline_string, sep='\n' ): + """ + Dedent, split, remove first and last empty lines, rejoin. + """ + multiline_string = textwrap.dedent( multiline_string ) + string_list = multiline_string.split( sep ) + if not string_list[0]: + string_list = string_list[1:] + if not string_list[-1]: + string_list = string_list[:-1] + #return '\n'.join( docstrings ) + return ''.join([ ( s + '\n' ) for s in string_list ]) diff -r 4b5f8a81adf16369d828aa82fe82f8141772c1d3 -r edef166c163f30175e98ddb0b98f7993d7d4d5eb test/unit/test_dataproviders.pyc Binary file test/unit/test_dataproviders.pyc has changed diff -r 4b5f8a81adf16369d828aa82fe82f8141772c1d3 -r edef166c163f30175e98ddb0b98f7993d7d4d5eb test/unit/test_tool_loader.pyc Binary file test/unit/test_tool_loader.pyc has changed Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.