commit/galaxy-central: carlfeberhard: Dataproviders: (wip) create a system of pipe-able iterators that can produce data for another source to consume, controlling/filtering/limiting/offsetting the output; Datatypes: use dataprovider decorators to allow datatypes to have factory methods that can create providers using their data as a source and their metadata as control
1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/0959eb80de7e/ Changeset: 0959eb80de7e User: carlfeberhard Date: 2013-06-17 19:24:10 Summary: Dataproviders: (wip) create a system of pipe-able iterators that can produce data for another source to consume, controlling/filtering/limiting/offsetting the output; Datatypes: use dataprovider decorators to allow datatypes to have factory methods that can create providers using their data as a source and their metadata as control Affected #: 14 files diff -r 5b1f643e9ea4da1dfda2859ef96d734f455c8972 -r 0959eb80de7e42f4bb7d53e44e48583849bf1b81 lib/galaxy/datatypes/binary.py --- a/lib/galaxy/datatypes/binary.py +++ b/lib/galaxy/datatypes/binary.py @@ -22,6 +22,7 @@ from galaxy.datatypes.metadata import MetadataElement from galaxy.datatypes import metadata from galaxy.datatypes.sniff import * +import dataproviders log = logging.getLogger(__name__) @@ -74,6 +75,7 @@ trans.response.headers["Content-Disposition"] = 'attachment; filename="Galaxy%s-[%s].%s"' % (dataset.hid, fname, to_ext) return open( dataset.file_name ) + class Ab1( Binary ): """Class describing an ab1 binary sequence file""" file_ext = "ab1" @@ -93,12 +95,15 @@ Binary.register_unsniffable_binary_ext("ab1") + class GenericAsn1Binary( Binary ): """Class for generic ASN.1 binary format""" file_ext = "asn1-binary" Binary.register_unsniffable_binary_ext("asn1-binary") + +@dataproviders.decorators.has_dataproviders class Bam( Binary ): """Class describing a BAM binary file""" file_ext = "bam" @@ -255,9 +260,92 @@ return dataset.peek except: return "Binary bam alignments file (%s)" % ( data.nice_size( dataset.get_size() ) ) + + # ------------- Dataproviders + # pipe through samtools view + #ALSO: (as Sam) + # bam does not use '#' to indicate comments/headers - we need to strip out those headers from the std. providers + #TODO:?? seems like there should be an easier way to do/inherit this - metadata.comment_char? + #TODO: incorporate samtools options to control output: regions first, then flags, etc. + @dataproviders.decorators.dataprovider_factory( 'line' ) + def line_dataprovider( self, dataset, **settings ): + samtools_source = dataproviders.dataset.SamtoolsDataProvider( dataset ) + settings[ 'comment_char' ] = '@' + return dataproviders.line.FilteredLineDataProvider( samtools_source, **settings ) + + @dataproviders.decorators.dataprovider_factory( 'regex-line' ) + def regex_line_dataprovider( self, dataset, **settings ): + samtools_source = dataproviders.dataset.SamtoolsDataProvider( dataset ) + settings[ 'comment_char' ] = '@' + return dataproviders.line.RegexLineDataProvider( samtools_source, **settings ) + @dataproviders.decorators.dataprovider_factory( 'column' ) + def column_dataprovider( self, dataset, **settings ): + samtools_source = dataproviders.dataset.SamtoolsDataProvider( dataset ) + settings[ 'comment_char' ] = '@' + return dataproviders.column.ColumnarDataProvider( samtools_source, **settings ) + + @dataproviders.decorators.dataprovider_factory( 'map' ) + def map_dataprovider( self, dataset, **settings ): + samtools_source = dataproviders.dataset.SamtoolsDataProvider( dataset ) + settings[ 'comment_char' ] = '@' + return dataproviders.column.MapDataProvider( samtools_source, **settings ) + + # these can't be used directly - may need BamColumn, BamMap (Bam metadata -> column/map) + # OR - see genomic_region_dataprovider + #@dataproviders.decorators.dataprovider_factory( 'dataset-column' ) + #def dataset_column_dataprovider( self, dataset, **settings ): + # settings[ 'comment_char' ] = '@' + # return super( Sam, self ).dataset_column_dataprovider( dataset, **settings ) + + #@dataproviders.decorators.dataprovider_factory( 'dataset-map' ) + #def dataset_map_dataprovider( self, dataset, **settings ): + # settings[ 'comment_char' ] = '@' + # return super( Sam, self ).dataset_map_dataprovider( dataset, **settings ) + + @dataproviders.decorators.dataprovider_factory( 'header' ) + def header_dataprovider( self, dataset, **settings ): + # in this case we can use an option of samtools view to provide just what we need (w/o regex) + samtools_source = dataproviders.dataset.SamtoolsDataProvider( dataset, '-H' ) + return dataproviders.line.RegexLineDataProvider( samtools_source, **settings ) + + @dataproviders.decorators.dataprovider_factory( 'id-seq-qual' ) + def id_seq_qual_dataprovider( self, dataset, **settings ): + settings[ 'indeces' ] = [ 0, 9, 10 ] + settings[ 'column_types' ] = [ 'str', 'str', 'str' ] + settings[ 'column_names' ] = [ 'id', 'seq', 'qual' ] + return self.map_dataprovider( dataset, **settings ) + + @dataproviders.decorators.dataprovider_factory( 'genomic-region' ) + def genomic_region_dataprovider( self, dataset, **settings ): + # GenomicRegionDataProvider currently requires a dataset as source - may not be necc. + #TODO:?? consider (at least) the possible use of a kwarg: metadata_source (def. to source.dataset), + # or remove altogether... + #samtools_source = dataproviders.dataset.SamtoolsDataProvider( dataset ) + #return dataproviders.dataset.GenomicRegionDataProvider( samtools_source, metadata_source=dataset, + # 2, 3, 3, **settings ) + + # instead, set manually and use in-class column gen + settings[ 'indeces' ] = [ 2, 3, 3 ] + settings[ 'column_types' ] = [ 'str', 'int', 'int' ] + return self.column_dataprovider( dataset, **settings ) + + @dataproviders.decorators.dataprovider_factory( 'genomic-region-map' ) + def genomic_region_map_dataprovider( self, dataset, **settings ): + settings[ 'indeces' ] = [ 2, 3, 3 ] + settings[ 'column_types' ] = [ 'str', 'int', 'int' ] + settings[ 'column_names' ] = [ 'chrom', 'start', 'end' ] + return self.map_dataprovider( dataset, **settings ) + + @dataproviders.decorators.dataprovider_factory( 'samtools' ) + def samtools_dataprovider( self, dataset, **settings ): + """Generic samtools interface - all options available through settings.""" + dataset_source = dataproviders.dataset.DatasetDataProvider( dataset ) + return dataproviders.dataset.SamtoolsDataProvider( dataset_source, **settings ) + Binary.register_sniffable_binary_format("bam", "bam", Bam) + class H5( Binary ): """Class describing an HDF5 file""" file_ext = "h5" @@ -277,6 +365,7 @@ Binary.register_unsniffable_binary_ext("h5") + class Scf( Binary ): """Class describing an scf binary sequence file""" file_ext = "scf" @@ -296,6 +385,7 @@ Binary.register_unsniffable_binary_ext("scf") + class Sff( Binary ): """ Standard Flowgram Format (SFF) """ file_ext = "sff" @@ -327,6 +417,7 @@ Binary.register_sniffable_binary_format("sff", "sff", Sff) + class BigWig(Binary): """ Accessing binary BigWig files from UCSC. @@ -363,6 +454,7 @@ Binary.register_sniffable_binary_format("bigwig", "bigwig", BigWig) + class BigBed(BigWig): """BigBed support from UCSC.""" @@ -375,6 +467,7 @@ Binary.register_sniffable_binary_format("bigbed", "bigbed", BigBed) + class TwoBit (Binary): """Class describing a TwoBit format nucleotide file""" diff -r 5b1f643e9ea4da1dfda2859ef96d734f455c8972 -r 0959eb80de7e42f4bb7d53e44e48583849bf1b81 lib/galaxy/datatypes/data.py --- a/lib/galaxy/datatypes/data.py +++ b/lib/galaxy/datatypes/data.py @@ -14,6 +14,8 @@ from galaxy.util.odict import odict from galaxy.util.sanitize_html import sanitize_html +import dataproviders + from galaxy import eggs eggs.require( "Paste" ) import paste @@ -56,6 +58,7 @@ cls.metadata_spec.update( base.metadata_spec ) #add contents of metadata spec of base class to cls metadata.Statement.process( cls ) +@dataproviders.decorators.has_dataproviders class Data( object ): """ Base class for all datatypes. Implements basic interfaces as well @@ -578,6 +581,40 @@ return [ 'trackster', 'circster' ] return [] + # ------------- Dataproviders + def has_dataprovider( self, data_format ): + """ + Returns True if `data_format` is available in `dataproviders`. + """ + return ( data_format in self.dataproviders ) + + def dataprovider( self, dataset, data_format, **settings ): + """ + Base dataprovider factory for all datatypes that returns the proper provider + for the given `data_format` or raises a `NoProviderAvailable`. + """ + #TODO:?? is this handling super class providers? + if self.has_dataprovider( data_format ): + return self.dataproviders[ data_format ]( self, dataset, **settings ) + raise dataproviders.exceptions.NoProviderAvailable( self, data_format ) + + @dataproviders.decorators.dataprovider_factory( 'base' ) + def base_dataprovider( self, dataset, **settings ): + dataset_source = dataproviders.dataset.DatasetDataProvider( dataset ) + return dataproviders.base.DataProvider( dataset_source, **settings ) + + @dataproviders.decorators.dataprovider_factory( 'chunk' ) + def chunk_dataprovider( self, dataset, **settings ): + dataset_source = dataproviders.dataset.DatasetDataProvider( dataset ) + return dataproviders.chunk.ChunkDataProvider( dataset_source, **settings ) + + @dataproviders.decorators.dataprovider_factory( 'chunk64' ) + def chunk64_dataprovider( self, dataset, **settings ): + dataset_source = dataproviders.dataset.DatasetDataProvider( dataset ) + return dataproviders.chunk.Base64ChunkDataProvider( dataset_source, **settings ) + + +@dataproviders.decorators.has_dataproviders class Text( Data ): file_ext = 'txt' line_class = 'line' @@ -747,10 +784,31 @@ f.close() split = classmethod(split) + # ------------- Dataproviders + @dataproviders.decorators.dataprovider_factory( 'line' ) + def line_dataprovider( self, dataset, **settings ): + """ + Returns an iterator over the dataset's lines (that have been `strip`ed) + optionally excluding blank lines and lines that start with a comment character. + """ + dataset_source = dataproviders.dataset.DatasetDataProvider( dataset ) + return dataproviders.line.FilteredLineDataProvider( dataset_source, **settings ) + + @dataproviders.decorators.dataprovider_factory( 'regex-line' ) + def regex_line_dataprovider( self, dataset, **settings ): + """ + Returns an iterator over the dataset's lines + optionally including/excluding lines that match one or more regex filters. + """ + dataset_source = dataproviders.dataset.DatasetDataProvider( dataset ) + return dataproviders.line.RegexLineDataProvider( dataset_source, **settings ) + + class GenericAsn1( Text ): """Class for generic ASN.1 text format""" file_ext = 'asn1' + class LineCount( Text ): """ Dataset contains a single line with a single integer that denotes the @@ -758,6 +816,7 @@ """ pass + class Newick( Text ): """New Hampshire/Newick Format""" file_ext = "nhx" diff -r 5b1f643e9ea4da1dfda2859ef96d734f455c8972 -r 0959eb80de7e42f4bb7d53e44e48583849bf1b81 lib/galaxy/datatypes/dataproviders/__init__.py --- /dev/null +++ b/lib/galaxy/datatypes/dataproviders/__init__.py @@ -0,0 +1,28 @@ + +#TODO: ---- This is a work in progress ---- +""" +Dataproviders are iterators with context managers that provide data to some +consumer datum by datum. + +As well as subclassing and overriding to get the proper data, Dataproviders +can be piped from one to the other. +..example:: + +.. note:: be careful to NOT pipe providers into subclasses of those providers. + Subclasses provide all the functionality of their superclasses, + so there's generally no need. + +.. note:: be careful to when using piped providers that accept the same keywords + in their __init__ functions (such as limit or offset) to pass those + keywords to the proper (often final) provider. These errors that result + can be hard to diagnose. +""" +import decorators +import exceptions + +import base +import chunk +import line +import column +import external +import dataset diff -r 5b1f643e9ea4da1dfda2859ef96d734f455c8972 -r 0959eb80de7e42f4bb7d53e44e48583849bf1b81 lib/galaxy/datatypes/dataproviders/base.py --- /dev/null +++ b/lib/galaxy/datatypes/dataproviders/base.py @@ -0,0 +1,260 @@ +""" +Base class(es) for all DataProviders. +""" +# there's a blurry line between functionality here and functionality in datatypes module +# attempting to keep parsing to a minimum here and focus on chopping/pagination/reformat(/filtering-maybe?) +# and using as much pre-computed info/metadata from the datatypes module as possible +# also, this shouldn't be a replacement/re-implementation of the tool layer +# (which provides traceability/versioning/reproducibility) + +from collections import deque +import exceptions + +_TODO = """ +hooks into datatypes (define providers inside datatype modules) as factories +capture tell() when provider is done + def stop( self ): self.endpoint = source.tell(); raise StopIteration() +implement __len__ sensibly where it can be (would be good to have where we're giving some progress - '100 of 300') + seems like sniffed files would have this info +unit tests +add datum entry/exit point methods: possibly decode, encode + or create a class that pipes source through - how would decode work then? + +icorporate existing visualization/dataproviders +some of the sources (esp. in datasets) don't need to be re-created + +YAGNI: InterleavingMultiSourceDataProvider, CombiningMultiSourceDataProvider +""" + +import logging +log = logging.getLogger( __name__ ) + + +# ----------------------------------------------------------------------------- base classes +class DataProvider( object ): + """ + Base class for all data providers. Data providers: + (a) have a source (which must be another file-like object) + (b) implement both the iterator and context manager interfaces + (c) do not allow write methods + (but otherwise implement the other file object interface methods) + """ + def __init__( self, source, **kwargs ): + """ + :param source: the source that this iterator will loop over. + (Should implement the iterable interface and ideally have the + context manager interface as well) + """ + self.source = self.validate_source( source ) + + def validate_source( self, source ): + """ + Is this a valid source for this provider? + + :raises InvalidDataProviderSource: if the source is considered invalid. + + Meant to be overridden in subclasses. + """ + if not source or not hasattr( source, '__iter__' ): + # that's by no means a thorough check + raise exceptions.InvalidDataProviderSource( source ) + return source + + #TODO: (this might cause problems later...) + #TODO: some providers (such as chunk's seek and read) rely on this... remove + def __getattr__( self, name ): + if name == 'source': + # if we're inside this fn, source hasn't been set - provide some safety just for this attr + return None + # otherwise, try to get the attr from the source - allows us to get things like provider.encoding, etc. + if hasattr( self.source, name ): + return getattr( self.source, name ) + # raise the proper error + return self.__getattribute__( name ) + + # write methods should not be allowed + def truncate( self, size ): + raise NotImplementedError( 'Write methods are purposely disabled' ) + def write( self, string ): + raise NotImplementedError( 'Write methods are purposely disabled' ) + def writelines( self, sequence ): + raise NotImplementedError( 'Write methods are purposely disabled' ) + + #TODO: route read methods through next? + #def readline( self ): + # return self.next() + def readlines( self ): + return [ line for line in self ] + + # iterator interface + def __iter__( self ): + # it's generators all the way up, Timmy + with self as source: + for datum in self.source: + yield datum + def next( self ): + return self.source.next() + + # context manager interface + def __enter__( self ): + # make the source's context manager interface optional + if hasattr( self.source, '__enter__' ): + self.source.__enter__() + return self + def __exit__( self, *args ): + # make the source's context manager interface optional, call on source if there + if hasattr( self.source, '__exit__' ): + self.source.__exit__( *args ) + # alternately, call close() + elif hasattr( self.source, 'close' ): + self.source.close() + + def __str__( self ): + """ + String representation for easier debugging. + + Will call `__str__` on it's source so this will display piped dataproviders. + """ + # we need to protect against recursion (in __getattr__) if self.source hasn't been set + source_str = str( self.source ) if hasattr( self, 'source' ) else '' + return '%s(%s)' %( self.__class__.__name__, str( source_str ) ) + + +class FilteredDataProvider( DataProvider ): + """ + Passes each datum through a filter function and yields it if that function + returns a non-`None` value. + + Also maintains counters: + - `num_data_read`: how many data have been consumed from the source. + - `num_valid_data_read`: how many data have been returned from `filter`. + - `num_data_returned`: how many data has this provider yielded. + """ + def __init__( self, source, filter_fn=None, **kwargs ): + """ + :param filter_fn: a lambda or function that will be passed a datum and + return either the (optionally modified) datum or None. + """ + super( FilteredDataProvider, self ).__init__( source, **kwargs ) + self.filter_fn = filter_fn + # count how many data we got from the source + self.num_data_read = 0 + # how many valid data have we gotten from the source + # IOW, data that's passed the filter and been either provided OR have been skipped due to offset + self.num_valid_data_read = 0 + # how many lines have been provided/output + self.num_data_returned = 0 + + def __iter__( self ): + parent_gen = super( FilteredDataProvider, self ).__iter__() + for datum in parent_gen: + self.num_data_read += 1 + datum = self.filter( datum ) + if datum != None: + self.num_valid_data_read += 1 + self.num_data_returned += 1 + yield datum + + #TODO: may want to squash this into DataProvider + def filter( self, datum ): + """ + When given a datum from the provider's source, return None if the datum + 'does not pass' the filter or is invalid. Return the datum if it's valid. + + :param datum: the datum to check for validity. + :returns: the datum, a modified datum, or None + + Meant to be overridden. + """ + if self.filter_fn: + return self.filter_fn( datum ) + # also can be overriden entirely + return datum + + +class LimitedOffsetDataProvider( FilteredDataProvider ): + """ + A provider that uses the counters from FilteredDataProvider to limit the + number of data and/or skip `offset` number of data before providing. + + Useful for grabbing sections from a source (e.g. pagination). + """ + #TODO: may want to squash this into DataProvider + def __init__( self, source, offset=0, limit=None, **kwargs ): + """ + :param offset: the number of data to skip before providing. + :param limit: the final number of data to provide. + """ + super( LimitedOffsetDataProvider, self ).__init__( source, **kwargs ) + + # how many valid data to skip before we start outputing data - must be positive + # (diff to support neg. indeces - must be pos.) + self.offset = max( offset, 0 ) + + # how many valid data to return - must be positive (None indicates no limit) + self.limit = limit + if self.limit != None: + self.limit = max( self.limit, 0 ) + + def __iter__( self ): + """ + Iterate over the source until `num_valid_data_read` is greater than + `offset`, begin providing datat, and stop when `num_data_returned` + is greater than `offset`. + """ + parent_gen = super( LimitedOffsetDataProvider, self ).__iter__() + for datum in parent_gen: + + if self.limit != None and self.num_data_returned > self.limit: + break + + if self.num_valid_data_read > self.offset: + yield datum + else: + # wot a cheezy way of doing this... + self.num_data_returned -= 1 + + #TODO: skipping lines is inefficient - somehow cache file position/line_num pair and allow provider + # to seek to a pos/line and then begin providing lines + # the important catch here is that we need to have accurate pos/line pairs + # in order to preserve the functionality of limit and offset + #if file_seek and len( file_seek ) == 2: + # seek_pos, new_line_num = file_seek + # self.seek_and_set_curr_line( seek_pos, new_line_num ) + + #def seek_and_set_curr_line( self, file_seek, new_curr_line_num ): + # self.seek( file_seek, os.SEEK_SET ) + # self.curr_line_num = new_curr_line_num + + +class MultiSourceDataProvider( DataProvider ): + """ + A provider that iterates over a list of given sources and provides data + from one after another. + + An iterator over iterators. + """ + def __init__( self, source_list, **kwargs ): + """ + :param source_list: an iterator of iterables + """ + self.source_list = deque( source_list ) + + def __iter__( self ): + """ + Iterate over the source_list, then iterate over the data in each source. + + Skip a given source in `source_list` if it is `None` or invalid. + """ + for source in self.source_list: + # just skip falsy sources + if not source: + continue + try: + self.source = self.validate_source( source ) + except exceptions.InvalidDataProviderSource, invalid_source: + continue + + parent_gen = super( MultiSourceDataProvider, self ).__iter__() + for datum in parent_gen: + yield datum diff -r 5b1f643e9ea4da1dfda2859ef96d734f455c8972 -r 0959eb80de7e42f4bb7d53e44e48583849bf1b81 lib/galaxy/datatypes/dataproviders/chunk.py --- /dev/null +++ b/lib/galaxy/datatypes/dataproviders/chunk.py @@ -0,0 +1,80 @@ +""" +Chunk (N number of bytes at M offset to a source's beginning) provider. + +Primarily for file sources but usable by any iterator that has both +seek and read( N ). +""" +import os +import base64 + +import base +import exceptions + +_TODO = """ +""" + +import logging +log = logging.getLogger( __name__ ) + + +# ----------------------------------------------------------------------------- +class ChunkDataProvider( base.DataProvider ): + """ + Data provider that yields chunks of data from it's file. + + Note: this version does not account for lines and works with Binary datatypes. + """ + MAX_CHUNK_SIZE = 2**16 + DEFAULT_CHUNK_SIZE = MAX_CHUNK_SIZE + + #TODO: subclass from LimitedOffsetDataProvider? + # see web/framework/base.iterate_file, util/__init__.file_reader, and datatypes.tabular + def __init__( self, source, chunk_index=0, chunk_size=DEFAULT_CHUNK_SIZE, **kwargs ): + """ + :param chunk_index: if a source can be divided into N number of + `chunk_size` sections, this is the index of which section to + return. + :param chunk_size: how large are the desired chunks to return + (gen. in bytes). + """ + super( ChunkDataProvider, self ).__init__( source, **kwargs ) + self.chunk_size = chunk_size + self.chunk_pos = chunk_index * self.chunk_size + + def validate_source( self, source ): + """ + Does the given source have both the methods `seek` and `read`? + :raises InvalidDataProviderSource: if not. + """ + source = super( ChunkDataProvider, self ).validate_source( source ) + if( ( not hasattr( source, 'seek' ) ) + or ( not hasattr( source, 'read' ) ) ): + raise exceptions.InvalidDataProviderSource( source ) + return source + + def __iter__( self ): + # not reeeally an iterator per se + self.__enter__() + self.source.seek( self.chunk_pos, os.SEEK_SET ) + chunk = self.encode( self.source.read( self.chunk_size ) ) + yield chunk + self.__exit__() + + def encode( self, chunk ): + """ + Called on the chunk before returning. + + Overrride to modify, encode, or decode chunks. + """ + return chunk + + +class Base64ChunkDataProvider( ChunkDataProvider ): + """ + Data provider that yields chunks of base64 encoded data from it's file. + """ + def encode( self, chunk ): + """ + Return chunks encoded in base 64. + """ + return base64.b64encode( chunk ) diff -r 5b1f643e9ea4da1dfda2859ef96d734f455c8972 -r 0959eb80de7e42f4bb7d53e44e48583849bf1b81 lib/galaxy/datatypes/dataproviders/column.py --- /dev/null +++ b/lib/galaxy/datatypes/dataproviders/column.py @@ -0,0 +1,242 @@ +""" +Providers that provide lists of lists generally where each line of a source +is further subdivided into multiple data (e.g. columns from a line). +""" + +import line + +_TODO = """ +move ColumnarDataProvider parsers to more sensible location + +TransposedColumnarDataProvider: provides each column as a single array + - see existing visualizations/dataprovider/basic.ColumnDataProvider +""" + +import logging +log = logging.getLogger( __name__ ) + + +# ----------------------------------------------------------------------------- base classes +class ColumnarDataProvider( line.RegexLineDataProvider ): + """ + Data provider that provide a list of columns from the lines of it's source. + + Columns are returned in the order given in indeces, so this provider can + re-arrange columns. + + If any desired index is outside the actual number of columns + in the source, this provider will None-pad the output and you are guaranteed + the same number of columns as the number of indeces asked for (even if they + are filled with None). + """ + def __init__( self, source, indeces=None, + column_count=None, column_types=None, parsers=None, parse_columns=True, + deliminator='\t', **kwargs ): + """ + :param indeces: a list of indeces of columns to gather from each row + Optional: will default to `None`. + If `None`, this provider will return all rows (even when a + particular row contains more/less than others). + If a row/line does not contain an element at a given index, the + provider will-return/fill-with a `None` value as the element. + :type indeces: list or None + + :param column_count: an alternate means of defining indeces, use an int + here to effectively provide the first N columns. + Optional: will default to `None`. + :type column_count: int + + :param column_types: a list of string names of types that the + provider will use to look up an appropriate parser for the column. + (e.g. 'int', 'float', 'str', 'bool') + Optional: will default to parsing all columns as strings. + :type column_types: list of strings + + :param parsers: a dictionary keyed with column type strings + and with values that are functions to use when parsing those + types. + Optional: will default to using the function `_get_default_parsers`. + :type parsers: dictionary + + :param parse_columns: attempt to parse columns? + Optional: defaults to `True`. + :type parse_columns: bool + + :param deliminator: character(s) used to split each row/line of the source. + Optional: defaults to the tab character. + :type deliminator: str + + .. note: that the subclass constructors are passed kwargs - so they're + params (limit, offset, etc.) are also applicable here. + """ + #TODO: other columnar formats: csv, etc. + super( ColumnarDataProvider, self ).__init__( source, **kwargs ) + + #IMPLICIT: if no indeces, column_count, or column_types passed: return all columns + self.selected_column_indeces = indeces + self.column_count = column_count + self.column_types = column_types or [] + # if no column count given, try to infer from indeces or column_types + if not self.column_count: + if self.selected_column_indeces: + self.column_count = len( self.selected_column_indeces ) + elif self.column_types: + self.column_count = len( self.column_types ) + # if no indeces given, infer from column_count + if not self.selected_column_indeces and self.column_count: + self.selected_column_indeces = list( xrange( self.column_count ) ) + + self.deliminator = deliminator + + # how/whether to parse each column value + self.parsers = {} + if parse_columns: + self.parsers = self._get_default_parsers() + # overwrite with user desired parsers + self.parsers.update( parsers or {} ) + + def _get_default_parsers( self ): + """ + Return parser dictionary keyed for each columnar type + (as defined in datatypes). + + .. note: primitives only by default (str, int, float, boolean, None). + Other (more complex) types are retrieved as strings. + :returns: a dictionary of the form: + `{ <parser type name> : <function used to parse type> }` + """ + #TODO: move to module level (or datatypes, util) + return { + # str is default and not needed here + 'int' : int, + 'float' : float, + 'bool' : bool, + + # unfortunately, 'list' is used in dataset metadata both for + # query style maps (9th col gff) AND comma-sep strings. + # (disabled for now) + #'list' : lambda v: v.split( ',' ), + #'csv' : lambda v: v.split( ',' ), + ## i don't like how urlparses does sub-lists... + #'querystr' : lambda v: dict([ ( p.split( '=', 1 ) if '=' in p else ( p, True ) ) + # for p in v.split( ';', 1 ) ]) + + #'scifloat': #floating point which may be in scientific notation + + # always with the 1 base, biologists? + #'int1' : ( lambda i: int( i ) - 1 ), + + #'gffval': string or '.' for None + #'gffint': # int or '.' for None + #'gffphase': # 0, 1, 2, or '.' for None + #'gffstrand': # -, +, ?, or '.' for None, etc. + } + + def _parse_value( self, val, type ): + """ + Attempt to parse and return the given value based on the given type. + + :param val: the column value to parse (often a string) + :param type: the string type 'name' used to find the appropriate parser + :returns: the parsed value + or `value` if no `type` found in `parsers` + or `None` if there was a parser error (ValueError) + """ + if type == 'str' or type == None: return val + try: + return self.parsers[ type ]( val ) + except KeyError, err: + # no parser - return as string + pass + except ValueError, err: + # bad value - return None + return None + return val + + def _get_column_type( self, index ): + """ + Get the column type for the parser from `self.column_types` or `None` + if the type is unavailable. + :param index: the column index + :returns: string name of type (e.g. 'float', 'int', etc.) + """ + try: + return self.column_types[ index ] + except IndexError, ind_err: + return None + + def _parse_column_at_index( self, columns, parser_index, index ): + """ + Get the column type for the parser from `self.column_types` or `None` + if the type is unavailable. + """ + try: + return self._parse_value( columns[ index ], self._get_column_type( parser_index ) ) + # if a selected index is not within columns, return None + except IndexError, index_err: + return None + + def _parse_columns_from_line( self, line ): + """ + Returns a list of the desired, parsed columns. + :param line: the line to parse + :type line: str + """ + #TODO: too much going on in this loop - the above should all be precomputed AMAP... + all_columns = line.split( self.deliminator ) + # if no indeces were passed to init, return all columns + selected_indeces = self.selected_column_indeces or list( xrange( len( all_columns ) ) ) + parsed_columns = [] + for parser_index, column_index in enumerate( selected_indeces ): + parsed_columns.append( self._parse_column_at_index( all_columns, parser_index, column_index ) ) + return parsed_columns + + def __iter__( self ): + parent_gen = super( ColumnarDataProvider, self ).__iter__() + for line in parent_gen: + columns = self._parse_columns_from_line( line ) + yield columns + + #TODO: implement column filters here and not below - flatten hierarchy + +class FilteredByColumnDataProvider( ColumnarDataProvider ): + """ + Data provider that provide a list of columns from the lines of it's source + _only_ if they pass a given filter function. + + e.g. column #3 is type int and > N + """ + # TODO: how to do this and still have limit and offset work? + def __init__( self, source, **kwargs ): + raise NotImplementedError() + super( FilteredByColumnDataProvider, self ).__init__( source, **kwargs ) + + +class MapDataProvider( ColumnarDataProvider ): + """ + Data provider that column_names and columns from the source's contents + into a dictionary. + + A combination use of both `column_names` and `indeces` allows 'picking' + key/value pairs from the source. + + .. note: that the subclass constructors are passed kwargs - so they're + params (limit, offset, etc.) are also applicable here. + """ + def __init__( self, source, column_names=None, **kwargs ): + """ + :param column_names: an ordered list of strings that will be used as the keys + for each column in the returned dictionaries. + The number of key, value pairs each returned dictionary has will + be as short as the number of column names provided. + :type column_names: + """ + #TODO: allow passing in a map instead of name->index { 'name1': index1, ... } + super( MapDataProvider, self ).__init__( source, **kwargs ) + self.column_names = column_names or [] + + def __iter__( self ): + parent_gen = super( MapDataProvider, self ).__iter__() + for column_values in parent_gen: + map = dict( zip( self.column_names, column_values ) ) + yield map diff -r 5b1f643e9ea4da1dfda2859ef96d734f455c8972 -r 0959eb80de7e42f4bb7d53e44e48583849bf1b81 lib/galaxy/datatypes/dataproviders/dataset.py --- /dev/null +++ b/lib/galaxy/datatypes/dataproviders/dataset.py @@ -0,0 +1,671 @@ +""" +Dataproviders that use either: + - the file contents and/or metadata from a Galaxy DatasetInstance as + their source. + - or provide data in some way relevant to bioinformatic data + (e.g. parsing genomic regions from their source) +""" + +import pkg_resources +pkg_resources.require( 'bx-python' ) +from bx import seq as bx_seq +from bx import wiggle as bx_wig + +import galaxy.model +import galaxy.datatypes +import galaxy.datatypes.data + +#TODO: can't import these due to circular ref in model/registry +#import galaxy.datatypes.binary +#import galaxy.datatypes.tabular + +import exceptions +import base +import line +import column +import external + +_TODO = """ +use bx as much as possible +the use of DatasetInstance seems to create some import problems +gff3 hierarchies +""" + +import logging +log = logging.getLogger( __name__ ) + + +# ----------------------------------------------------------------------------- base for using a Glx dataset +class DatasetDataProvider( base.DataProvider ): + """ + Class that uses the file contents and/or metadata from a Galaxy DatasetInstance + as it's source. + + DatasetDataProvider can be seen as the intersection between a datatype's + metadata and a dataset's file contents. It (so far) mainly provides helper + and conv. methods for using dataset metadata to set up and control how + the data is provided. + """ + def __init__( self, dataset, **kwargs ): + """ + :param dataset: the Galaxy dataset whose file will be the source + :type dataset: model.DatasetInstance + + :raises exceptions.InvalidDataProviderSource: if not a DatsetInstance + """ + if not isinstance( dataset, galaxy.model.DatasetInstance ): + raise exceptions.InvalidDataProviderSource( "Data provider can only be used with a DatasetInstance" ) + self.dataset = dataset + # this dataset file is obviously the source + #TODO: this might be a good place to interface with the object_store... + super( DatasetDataProvider, self ).__init__( open( dataset.file_name, 'rb' ) ) + + #TODO: this is a bit of a mess + @classmethod + def get_column_metadata_from_dataset( cls, dataset ): + """ + Convenience class method to get column metadata from a dataset. + :returns: a dictionary of `column_count`, `column_types`, and `column_names` + if they're available, setting each to `None` if not. + """ + # re-map keys to fit ColumnarProvider.__init__ kwargs + params = {} + params[ 'column_count' ] = dataset.metadata.columns + params[ 'column_types' ] = dataset.metadata.column_types + params[ 'column_names' ] = dataset.metadata.column_names or getattr( dataset.datatype, 'column_names', None ) + return params + + def get_metadata_column_types( self, indeces=None ): + """ + Return the list of `column_types` for this dataset or `None` if unavailable. + :param indeces: the indeces for the columns of which to return the types. + Optional: defaults to None (return all types) + :type indeces: list of ints + """ + metadata_column_types = ( self.dataset.metadata.column_types + or getattr( self.dataset.datatype, 'column_types', None ) + or None ) + if not metadata_column_types: + return metadata_column_types + if indeces: + column_types = [] + for index in indeces: + column_type = metadata_column_types[ index ] if index < len( metadata_column_types ) else None + column_types.append( column_type ) + return column_types + return metadata_column_types + + def get_metadata_column_names( self, indeces=None ): + """ + Return the list of `column_names` for this dataset or `None` if unavailable. + :param indeces: the indeces for the columns of which to return the names. + Optional: defaults to None (return all names) + :type indeces: list of ints + """ + metadata_column_names = ( self.dataset.metadata.column_names + or getattr( self.dataset.datatype, 'column_names', None ) + or None ) + if not metadata_column_names: + return metadata_column_names + if indeces: + column_names = [] + for index in indeces: + column_type = metadata_column_names[ index ] if index < len( metadata_column_names ) else None + column_names.append( column_type ) + return column_names + return metadata_column_names + + #TODO: merge the next two + def get_indeces_by_column_names( self, list_of_column_names ): + """ + Return the list of column indeces when given a list of column_names. + :param list_of_column_names: the names of the columns of which to get indeces. + :type list_of_column_names: list of strs + :raises KeyError: if column_names are not found + :raises ValueError: if an entry in list_of_column_names is not in column_names + """ + metadata_column_names = ( self.dataset.metadata.column_names + or getattr( self.dataset.datatype, 'column_names', None ) + or None ) + if not metadata_column_names: + raise KeyError( 'No column_names found for ' + + 'datatype: %s, dataset: %s' %( str( self.dataset.datatype ), str( self.dataset ) ) ) + indeces = [] + for column_name in list_of_column_names: + indeces.append( metadata_column_names.index( column_name ) ) + return indeces + + def get_metadata_column_index_by_name( self, name ): + """ + Return the 1-base index of a sources column with the given `name`. + """ + # metadata columns are 1-based indeces + column = getattr( self.dataset.metadata, name ) + return ( column - 1 ) if isinstance( column, int ) else None + + def get_genomic_region_indeces( self, check=False ): + """ + Return a list of column indeces for 'chromCol', 'startCol', 'endCol' from + a source representing a genomic region. + + :param check: if True will raise a ValueError if any were not found. + :type check: bool + :raises ValueError: if check is `True` and one or more indeces were not found. + :returns: list of column indeces for the named columns. + """ + region_column_names = ( 'chromCol', 'startCol', 'endCol' ) + region_indeces = [ self.get_metadata_column_index_by_name( name ) for name in region_column_names ] + if check and not all( map( lambda i: i != None, indeces ) ): + raise ValueError( "Could not determine proper column indeces for chrom, start, end: %s" %( str( indeces ) ) ) + return region_indeces + + +class ConvertedDatasetDataProvider( DatasetDataProvider ): + """ + Class that uses the file contents of a dataset after conversion to a different + format. + """ + def __init__( self, dataset, **kwargs ): + raise NotImplementedError( 'Abstract class' ) + self.original_dataset = dataset + self.converted_dataset = self.convert_dataset( dataset, **kwargs ) + super( ConvertedDatasetDataProvider, self ).__init__( self.converted_dataset, **kwargs ) + #NOTE: now self.converted_dataset == self.dataset + + def convert_dataset( self, dataset, **kwargs ): + """ + Convert the given dataset in some way. + """ + return dataset + + +# ----------------------------------------------------------------------------- uses metadata for settings +class DatasetColumnarDataProvider( column.ColumnarDataProvider ): + """ + Data provider that uses a DatasetDataProvider as it's source and the + dataset's metadata to buuild settings for the ColumnarDataProvider it's + inherited from. + """ + def __init__( self, dataset, **kwargs ): + """ + All kwargs are inherited from ColumnarDataProvider. + .. seealso:: column.ColumnarDataProvider + + If no kwargs are given, this class will attempt to get those kwargs + from the dataset source's metadata. + If any kwarg is given, it will override and be used in place of + any metadata available. + """ + dataset_source = DatasetDataProvider( dataset ) + if not kwargs.get( 'column_types', None ): + indeces = kwargs.get( 'indeces', None ) + kwargs[ 'column_types' ] = dataset_source.get_metadata_column_types( indeces=indeces ) + super( DatasetColumnarDataProvider, self ).__init__( dataset_source, **kwargs ) + + +class DatasetMapDataProvider( column.MapDataProvider ): + """ + Data provider that uses a DatasetDataProvider as it's source and the + dataset's metadata to buuild settings for the MapDataProvider it's + inherited from. + """ + def __init__( self, dataset, **kwargs ): + """ + All kwargs are inherited from MapDataProvider. + .. seealso:: column.MapDataProvider + + If no kwargs are given, this class will attempt to get those kwargs + from the dataset source's metadata. + If any kwarg is given, it will override and be used in place of + any metadata available. + + The relationship between column_names and indeces is more complex: + +-----------------+-------------------------------+-----------------------+ + | | Indeces given | Indeces NOT given | + +=================+===============================+=======================+ + | Names given | pull indeces, rename w/ names | pull by name | + +=================+-------------------------------+-----------------------+ + | Names NOT given | pull indeces, name w/ meta | pull all, name w/meta | + +=================+-------------------------------+-----------------------+ + """ + dataset_source = DatasetDataProvider( dataset ) + + #TODO: getting too complicated - simplify at some lvl, somehow + # if no column_types given, get column_types from indeces (or all if indeces == None) + indeces = kwargs.get( 'indeces', None ) + column_names = kwargs.get( 'column_names', None ) + + #if indeces and column_names: + # # pull using indeces and re-name with given names - no need to alter (does as super would) + # pass + + if not indeces and column_names: + # pull columns by name + indeces = kwargs[ 'indeces' ] = dataset_source.get_indeces_by_column_names( column_names ) + + elif indeces and not column_names: + # pull using indeces, name with meta + column_names = kwargs[ 'column_names' ] = dataset_source.get_metadata_column_names( indeces=indeces ) + + elif not indeces and not column_names: + # pull all indeces and name using metadata + column_names = kwargs[ 'column_names' ] = dataset_source.get_metadata_column_names( indeces=indeces ) + + # if no column_types given, use metadata column_types + if not kwargs.get( 'column_types', None ): + kwargs[ 'column_types' ] = dataset_source.get_metadata_column_types( indeces=indeces ) + + super( DatasetMapDataProvider, self ).__init__( dataset_source, **kwargs ) + + +# ----------------------------------------------------------------------------- provides a bio-relevant datum +class GenomicRegionDataProvider( column.ColumnarDataProvider ): + """ + Data provider that parses chromosome, start, and end data from a file + using the datasets metadata settings. + + Is a ColumnarDataProvider that uses a DatasetDataProvider as it's source. + + If `named_columns` is true, will return dictionaries with the keys + 'chrom', 'start', 'end'. + """ + # dictionary keys when named_columns=True + COLUMN_NAMES = [ 'chrom', 'start', 'end' ] + + def __init__( self, dataset, chrom_column=None, start_column=None, end_column=None, named_columns=False, **kwargs ): + """ + :param dataset: the Galaxy dataset whose file will be the source + :type dataset: model.DatasetInstance + + :param chrom_column: optionally specify the chrom column index + :type chrom_column: int + :param start_column: optionally specify the start column index + :type start_column: int + :param end_column: optionally specify the end column index + :type end_column: int + + :param named_columns: optionally return dictionaries keying each column + with 'chrom', 'start', or 'end'. + Optional: defaults to False + :type named_columns: bool + """ + #TODO: allow passing in a string format e.g. "{chrom}:{start}-{end}" + dataset_source = DatasetDataProvider( dataset ) + + if chrom_column == None: + chrom_column = dataset_source.get_metadata_column_index_by_name( 'chromCol' ) + if start_column == None: + start_column = dataset_source.get_metadata_column_index_by_name( 'startCol' ) + if end_column == None: + end_column = dataset_source.get_metadata_column_index_by_name( 'endCol' ) + indeces = [ chrom_column, start_column, end_column ] + if not all( map( lambda i: i != None, indeces ) ): + raise ValueError( "Could not determine proper column indeces for" + + " chrom, start, end: %s" %( str( indeces ) ) ) + kwargs.update({ 'indeces' : indeces }) + + if not kwargs.get( 'column_types', None ): + kwargs.update({ 'column_types' : dataset_source.get_metadata_column_types( indeces=indeces ) }) + + self.named_columns = named_columns + if self.named_columns: + self.column_names = self.COLUMN_NAMES + + super( GenomicRegionDataProvider, self ).__init__( dataset_source, **kwargs ) + + def __iter__( self ): + parent_gen = super( GenomicRegionDataProvider, self ).__iter__() + for column_values in parent_gen: + if self.named_columns: + yield dict( zip( self.column_names, column_values ) ) + else: + yield column_values + + +#TODO: this optionally provides the same data as the above and makes GenomicRegionDataProvider redundant +# GenomicRegionDataProvider is a better name, tho +class IntervalDataProvider( column.ColumnarDataProvider ): + """ + Data provider that parses chromosome, start, and end data (as well as strand + and name if set in the metadata) using the dataset's metadata settings. + + If `named_columns` is true, will return dictionaries with the keys + 'chrom', 'start', 'end' (and 'strand' and 'name' if available). + """ + COLUMN_NAMES = [ 'chrom', 'start', 'end', 'strand', 'name' ] + + def __init__( self, dataset, chrom_column=None, start_column=None, end_column=None, + strand_column=None, name_column=None, named_columns=False, **kwargs ): + """ + :param dataset: the Galaxy dataset whose file will be the source + :type dataset: model.DatasetInstance + + :param named_columns: optionally return dictionaries keying each column + with 'chrom', 'start', 'end', 'strand', or 'name'. + Optional: defaults to False + :type named_columns: bool + """ + #TODO: allow passing in a string format e.g. "{chrom}:{start}-{end}" + dataset_source = DatasetDataProvider( dataset ) + + # get genomic indeces and add strand and name + if chrom_column == None: + chrom_column = dataset_source.get_metadata_column_index_by_name( 'chromCol' ) + if start_column == None: + start_column = dataset_source.get_metadata_column_index_by_name( 'startCol' ) + if end_column == None: + end_column = dataset_source.get_metadata_column_index_by_name( 'endCol' ) + if strand_column == None: + strand_column = dataset_source.get_metadata_column_index_by_name( 'strandCol' ) + if name_column == None: + name_column = dataset_source.get_metadata_column_index_by_name( 'nameCol' ) + indeces = [ chrom_column, start_column, end_column, strand_column, name_column ] + kwargs.update({ 'indeces' : indeces }) + + if not kwargs.get( 'column_types', None ): + kwargs.update({ 'column_types' : dataset_source.get_metadata_column_types( indeces=indeces ) }) + + self.named_columns = named_columns + if self.named_columns: + self.column_names = self.COLUMN_NAMES + + super( IntervalDataProvider, self ).__init__( dataset_source, **kwargs ) + + def __iter__( self ): + parent_gen = super( IntervalDataProvider, self ).__iter__() + for column_values in parent_gen: + if self.named_columns: + yield dict( zip( self.column_names, column_values ) ) + else: + yield column_values + + +#TODO: ideally with these next two - you'd allow pulling some region from the sequence +# WITHOUT reading the entire seq into memory - possibly apply some version of limit/offset +class FastaDataProvider( base.FilteredDataProvider ): + """ + Class that returns fasta format data in a list of maps of the form: + { + id: <fasta header id>, + sequence: <joined lines of nucleotide/amino data> + } + """ + def __init__( self, source, ids=None, **kwargs ): + """ + :param ids: optionally return only ids (and sequences) that are in this list. + Optional: defaults to None (provide all ids) + :type ids: list or None + """ + source = bx_seq.fasta.FastaReader( source ) + #TODO: validate is a fasta + super( FastaDataProvider, self ).__init__( source, **kwargs ) + self.ids = ids + # how to do ids? + + def __iter__( self ): + parent_gen = super( FastaDataProvider, self ).__iter__() + for fasta_record in parent_gen: + yield { + 'id' : fasta_record.name, + 'seq' : fasta_record.text + } + + +class TwoBitFastaDataProvider( DatasetDataProvider ): + """ + Class that returns fasta format data in a list of maps of the form: + { + id: <fasta header id>, + sequence: <joined lines of nucleotide/amino data> + } + """ + def __init__( self, source, ids=None, **kwargs ): + """ + :param ids: optionally return only ids (and sequences) that are in this list. + Optional: defaults to None (provide all ids) + :type ids: list or None + """ + source = bx_seq.twobit.TwoBitFile( source ) + #TODO: validate is a 2bit + super( FastaDataProvider, self ).__init__( source, **kwargs ) + # could do in order provided with twobit + self.ids = ids or self.source.keys() + + def __iter__( self ): + for id_ in self.ids: + yield { + 'id' : id_, + 'seq' : self.source[ name ] + } + + +#TODO: +class WiggleDataProvider( base.LimitedOffsetDataProvider ): + """ + Class that returns chrom, pos, data from a wiggle source. + """ + COLUMN_NAMES = [ 'chrom', 'pos', 'value' ] + + def __init__( self, source, named_columns=False, column_names=None, **kwargs ): + """ + :param named_columns: optionally return dictionaries keying each column + with 'chrom', 'start', 'end', 'strand', or 'name'. + Optional: defaults to False + :type named_columns: bool + + :param column_names: an ordered list of strings that will be used as the keys + for each column in the returned dictionaries. + The number of key, value pairs each returned dictionary has will + be as short as the number of column names provided. + :type column_names: + """ + #TODO: validate is a wig + # still good to maintain a ref to the raw source bc Reader won't + self.raw_source = source + self.parser = bx_wig.Reader( source ) + super( WiggleDataProvider, self ).__init__( self.parser, **kwargs ) + + self.named_columns = named_columns + self.column_names = column_names or self.COLUMN_NAMES + + def __iter__( self ): + parent_gen = super( WiggleDataProvider, self ).__iter__() + for three_tuple in parent_gen: + if self.named_columns: + yield dict( zip( self.column_names, three_tuple ) ) + else: + # list is not strictly necessary - but consistent + yield list( three_tuple ) + + +class BigWigDataProvider( base.LimitedOffsetDataProvider ): + """ + Class that returns chrom, pos, data from a wiggle source. + """ + COLUMN_NAMES = [ 'chrom', 'pos', 'value' ] + + def __init__( self, source, chrom, start, end, named_columns=False, column_names=None, **kwargs ): + """ + :param chrom: which chromosome within the bigbed file to extract data for + :type chrom: str + :param start: the start of the region from which to extract data + :type start: int + :param end: the end of the region from which to extract data + :type end: int + + :param named_columns: optionally return dictionaries keying each column + with 'chrom', 'start', 'end', 'strand', or 'name'. + Optional: defaults to False + :type named_columns: bool + + :param column_names: an ordered list of strings that will be used as the keys + for each column in the returned dictionaries. + The number of key, value pairs each returned dictionary has will + be as short as the number of column names provided. + :type column_names: + """ + raise NotImplementedError( 'Work in progress' ) + #TODO: validate is a wig + # still good to maintain a ref to the raw source bc Reader won't + self.raw_source = source + self.parser = bx_bbi.bigwig_file.BigWigFile( source ) + super( BigWigDataProvider, self ).__init__( self.parser, **kwargs ) + + self.named_columns = named_columns + self.column_names = column_names or self.COLUMN_NAMES + + def __iter__( self ): + parent_gen = super( BigWigDataProvider, self ).__iter__() + for three_tuple in parent_gen: + if self.named_columns: + yield dict( zip( self.column_names, three_tuple ) ) + else: + # list is not strictly necessary - but consistent + yield list( three_tuple ) + + +# ----------------------------------------------------------------------------- binary, external conversion or tool +class DatasetSubprocessDataProvider( external.SubprocessDataProvider ): + """ + Create a source from running a subprocess on a dataset's file. + + Uses a subprocess as it's source and has a dataset (gen. as an input file + for the process). + """ + #TODO: below should be a subclass of this and not RegexSubprocess + def __init__( self, dataset, *args, **kwargs ): + """ + :param args: the list of strings used to build commands. + :type args: variadic function args + """ + raise NotImplementedError( 'Abstract class' ) + super( DatasetSubprocessDataProvider, self ).__init__( *args, **kwargs ) + self.dataset = dataset + + +class SamtoolsDataProvider( line.RegexLineDataProvider ): + """ + Data provider that uses samtools on a Sam or Bam file as it's source. + + This can be piped through other providers (column, map, genome region, etc.). + + .. note:: that only the samtools 'view' command is currently implemented. + """ + FLAGS_WO_ARGS = 'bhHSu1xXcB' + FLAGS_W_ARGS = 'fFqlrs' + VALID_FLAGS = FLAGS_WO_ARGS + FLAGS_W_ARGS + + def __init__( self, dataset, options_string='', options_dict=None, regions=None, **kwargs ): + """ + :param options_string: samtools options in string form (flags separated + by spaces) + Optional: defaults to '' + :type options_string: str + :param options_dict: dictionary of samtools options + Optional: defaults to None + :type options_dict: dict or None + :param regions: list of samtools regions strings + Optional: defaults to None + :type regions: list of str or None + """ + #TODO: into validate_source + + #TODO: have to import these here due to circular ref in model/datatypes + import galaxy.datatypes.binary + import galaxy.datatypes.tabular + if( not( isinstance( dataset.datatype, galaxy.datatypes.tabular.Sam ) + or isinstance( dataset.datatype, galaxy.datatypes.binary.Bam ) ) ): + raise exceptions.InvalidDataProviderSource( + 'dataset must be a Sam or Bam datatype: %s' %( str( dataset.datatype ) ) ) + self.dataset = dataset + + options_dict = options_dict or {} + # ensure regions are strings + regions = [ str( r ) for r in regions ] if regions else [] + + #TODO: view only for now + #TODO: not properly using overriding super's validate_opts, command here + subcommand = 'view' + #TODO:?? do we need a path to samtools? + subproc_args = self.build_command_list( subcommand, options_string, options_dict, regions ) +#TODO: the composition/inheritance here doesn't make a lot sense + subproc_provider = external.SubprocessDataProvider( *subproc_args ) + super( SamtoolsDataProvider, self ).__init__( subproc_provider, **kwargs ) + + def build_command_list( self, subcommand, options_string, options_dict, regions ): + """ + Convert all init args to list form. + """ + command = [ 'samtools', subcommand ] + # add options and switches, input file, regions list (if any) + command.extend( self.to_options_list( options_string, options_dict ) ) + command.append( self.dataset.file_name ) + command.extend( regions ) + return command + + def to_options_list( self, options_string, options_dict ): + """ + Convert both options_string and options_dict to list form + while filtering out non-'valid' options. + """ + opt_list = [] + + # strip out any user supplied bash switch formating -> string of option chars + # then compress to single option string of unique, VALID flags with prefixed bash switch char '-' + options_string = options_string.strip( '- ' ) + validated_flag_list = set([ flag for flag in options_string if flag in self.FLAGS_WO_ARGS ]) + + # if sam add -S + if( ( isinstance( self.dataset.datatype, galaxy.datatypes.tabular.Sam ) + and ( 'S' not in validated_flag_list ) ) ): + validated_flag_list.append( 'S' ) + + if validated_flag_list: + opt_list.append( '-' + ''.join( validated_flag_list ) ) + + for flag, arg in options_dict.items(): + if flag in self.FLAGS_W_ARGS: + opt_list.extend([ '-' + flag, str( arg ) ]) + + return opt_list + + @classmethod + def extract_options_from_dict( cls, dictionary ): + """ + Separrates valid samtools key/value pair options from a dictionary and + returns both as a 2-tuple. + """ + # handy for extracting options from kwargs - but otherwise... + #TODO: could be abstracted to util.extract( dict, valid_keys_list ) + options_dict = {} + new_kwargs = {} + for key, value in dictionary.items(): + if key in cls.FLAGS_W_ARGS: + options_dict[ key ] = value + else: + new_kwargs[ key ] = value + return options_dict, new_kwargs + + +class BcftoolsDataProvider( line.RegexLineDataProvider ): + """ + Data provider that uses an bcftools on a bcf (or vcf?) file as it's source. + + This can be piped through other providers (column, map, genome region, etc.). + """ + def __init__( self, dataset, **kwargs ): + #TODO: as samtools + raise NotImplementedError() + super( BCFDataProvider, self ).__init__( dataset, **kwargs ) + + +class BGzipTabixDataProvider( base.DataProvider ): + """ + Data provider that uses an g(un)zip on a file as it's source. + + This can be piped through other providers (column, map, genome region, etc.). + """ + def __init__( self, dataset, **kwargs ): + #TODO: as samtools - need more info on output format + raise NotImplementedError() + super( BGzipTabixDataProvider, self ).__init__( dataset, **kwargs ) diff -r 5b1f643e9ea4da1dfda2859ef96d734f455c8972 -r 0959eb80de7e42f4bb7d53e44e48583849bf1b81 lib/galaxy/datatypes/dataproviders/decorators.py --- /dev/null +++ b/lib/galaxy/datatypes/dataproviders/decorators.py @@ -0,0 +1,107 @@ +""" +DataProvider related decorators. +""" + +# I'd like to decorate the factory methods that give data_providers by the name they can be accessed from. e.g.: +#@provides( 'id_seq' ) # where 'id_seq' is some 'data_format' string/alias +#def get_id_seq_provider( dataset, **settings ): + +# then in some central dispatch (maybe data.Data), have it look up the proper method by the data_format string + +# also it would be good to have this decorator maintain a list of available providers (for a datatype) + +# i don't particularly want to cut up method names ( get_([\w_]*)_provider ) +#!/usr/bin/env python + +# adapted from: http://stackoverflow.com +# /questions/14095616/python-can-i-programmatically-decorate-class-methods-from-a-class-instance + +from functools import wraps +#from types import MethodType +import copy + +import logging +log = logging.getLogger( __name__ ) + + +# ----------------------------------------------------------------------------- +_DATAPROVIDER_CLASS_MAP_KEY = 'dataproviders' +_DATAPROVIDER_METHOD_NAME_KEY = '_dataprovider_name' + +# ----------------------------------------------------------------------------- +def has_dataproviders( cls ): + """ + Wraps a class (generally a Datatype), finds methods within that have been + decorated with `@dataprovider` and adds them, by their name, to a map + in the class. + + This allows a class to maintain a name -> method map, effectively + 'registering' dataprovider factory methods. + + .. example:: + @has_dataproviders + class MyDtype( data.Data ): + + @dataprovider_factory( 'bler' ) + def provide_some_bler( self, dataset, **settings ): + '''blerblerbler''' + dataset_source = providers.DatasetDataProvider( dataset ) + # ... chain other, intermidiate providers here + return providers.BlerDataProvider( dataset_source, **settings ) + + # use the base method in data.Data + provider = dataset.datatype.dataprovider( dataset, 'bler', + my_setting='blah', ... ) + # OR directly from the map + provider = dataset.datatype.dataproviders[ 'bler' ]( dataset, + my_setting='blah', ... ) + """ + #log.debug( 'has_dataproviders:', cls ) + # init the class dataproviders map if necc. + if not hasattr( cls, _DATAPROVIDER_CLASS_MAP_KEY ): + setattr( cls, _DATAPROVIDER_CLASS_MAP_KEY, {} ) + else: + # need to deepcopy or subclasses will modify super.dataproviders as well + existing_dataproviders = getattr( cls, _DATAPROVIDER_CLASS_MAP_KEY ) + copied_dataproviders = copy.deepcopy( existing_dataproviders ) + setattr( cls, _DATAPROVIDER_CLASS_MAP_KEY, copied_dataproviders ) + + dataproviders = getattr( cls, _DATAPROVIDER_CLASS_MAP_KEY ) + + # scan for methods with dataprovider names and add them to the map + # note: this has a 'cascading' effect + # where it's possible to override a super's provider with a sub's + for attr_key, attr_value in cls.__dict__.iteritems(): + #log.debug( '\t key:', attr_key ) + # can't use isinstance( attr_value, MethodType ) bc of wrapping + if( ( callable( attr_value ) ) + and ( not attr_key.startswith( "__" ) ) + and ( getattr( attr_value, _DATAPROVIDER_METHOD_NAME_KEY, None ) ) ): + #log.debug( '\t\t is a dataprovider', attr_key ) + name = getattr( attr_value, _DATAPROVIDER_METHOD_NAME_KEY ) + dataproviders[ name ] = attr_value + + #log.debug( 'dataproviders:' ) + #for name, fn in cls.dataproviders.items(): + # log.debug( '\t ', name, '->', fn.__name__, fn ) + # log.debug( '\t\t ', fn.__doc__ ) + return cls + +def dataprovider_factory( name ): + """ + Wraps a class method and marks it as a dataprovider factory. + + :param name: what name/key to register the factory under in `cls.dataproviders` + :param type: any hashable var + """ + #log.debug( 'dataprovider:', name ) + def named_dataprovider_factory( func ): + #log.debug( 'named_dataprovider_factory:', name, '->', func.__name__ ) + setattr( func, _DATAPROVIDER_METHOD_NAME_KEY, name ) + #log.debug( '\t setting:', getattr( func, _DATAPROVIDER_METHOD_NAME_KEY ) ) + @wraps( func ) + def wrapped_dataprovider_factory( self, *args, **kwargs ): + #log.debug( 'wrapped_dataprovider_factory', name, self, args, kwargs ) + return func( self, *args, **kwargs ) + return wrapped_dataprovider_factory + return named_dataprovider_factory diff -r 5b1f643e9ea4da1dfda2859ef96d734f455c8972 -r 0959eb80de7e42f4bb7d53e44e48583849bf1b81 lib/galaxy/datatypes/dataproviders/exceptions.py --- /dev/null +++ b/lib/galaxy/datatypes/dataproviders/exceptions.py @@ -0,0 +1,33 @@ +""" +DataProvider related exceptions. +""" + +class InvalidDataProviderSource( TypeError ): + """ + Raised when a unusable source is passed to a provider. + """ + def __init__( self, source=None, msg='' ): + msg = msg or 'Invalid source for provider: %s' %( source ) + super( InvalidDataProviderSource, self ).__init__( msg ) + + +class NoProviderAvailable( TypeError ): + """ + Raised when no provider is found for the given `format_requested`. + + :param factory_source: the item that the provider was requested from + :param format_requested: the format_requested (a hashable key to access + `factory_source.datatypes` with) + + Both params are attached to this class and accessible to the try-catch + receiver. + + Meant to be used within a class that builds dataproviders (e.g. a Datatype) + """ + def __init__( self, factory_source, format_requested=None, msg='' ): + self.factory_source = factory_source + self.format_requested = format_requested + msg = msg or 'No provider available in factory_source "%s" for format requested' %( str( factory_source ) ) + if self.format_requested: + msg += ': "%s"' %( self.format_requested ) + super( NoProviderAvailable, self ).__init__( msg ) diff -r 5b1f643e9ea4da1dfda2859ef96d734f455c8972 -r 0959eb80de7e42f4bb7d53e44e48583849bf1b81 lib/galaxy/datatypes/dataproviders/external.py --- /dev/null +++ b/lib/galaxy/datatypes/dataproviders/external.py @@ -0,0 +1,165 @@ +""" +Data providers that iterate over a source that is not in memory +or not in a file. +""" + +import subprocess +import urllib, urllib2 +import gzip + +import base +import line + +_TODO = """ +YAGNI: ftp, image, cryptos, sockets +job queue +admin: admin server log rgx/stats, ps aux +""" + +import logging +log = logging.getLogger( __name__ ) + + +# ----------------------------------------------------------------------------- server subprocess / external prog +class SubprocessDataProvider( base.DataProvider ): + """ + Data provider that uses the output from an intermediate program and + subprocess as it's data source. + """ + #TODO: need better ways of checking returncode, stderr for errors and raising + def __init__( self, *args, **kwargs ): + """ + :param args: the list of strings used to build commands. + :type args: variadic function args + """ + self.exit_code = None + command_list = args + self.popen = self.subprocess( *command_list, **kwargs ) + #TODO:?? not communicate()? + super( SubprocessDataProvider, self ).__init__( self.popen.stdout ) + self.exit_code = self.popen.poll() + + #NOTE: there's little protection here v. sending a ';' and a dangerous command here + # but...we're all adults here, right? ...RIGHT?! + def subprocess( self, *command_list, **kwargs ): + """ + :param args: the list of strings used as commands. + :type args: variadic function args + """ + try: + # how expensive is this? + popen = subprocess.Popen( command_list, stderr=subprocess.PIPE, stdout=subprocess.PIPE ) + log.info( 'opened subrocess (%s), PID: %s' %( str( command_list ), str( popen.pid ) ) ) + #log.debug( 'stderr:\n%s\n' %( popen.stderr.read() ) ) + + except OSError, os_err: + command_str = ' '.join( self.command ) + raise OSError( ' '.join([ str( os_err ), ':', command_str ]) ) + + return popen + + def __exit__( self, *args ): + # poll the subrocess for an exit code + self.exit_code = self.popen.poll() + log.info( '%s.__exit__, exit_code: %s' %( str( self ), str( self.exit_code ) ) ) + return super( SubprocessDataProvider, self ).__exit__( *args ) + + def __str__( self ): + # provide the pid and current return code + source_str = '' + if hasattr( self, 'popen' ): + source_str = '%s:%s' %( str( self.popen.pid ), str( self.popen.poll() ) ) + return '%s(%s)' %( self.__class__.__name__, str( source_str ) ) + + +class RegexSubprocessDataProvider( line.RegexLineDataProvider ): + """ + RegexLineDataProvider that uses a SubprocessDataProvider as it's data source. + """ + # this is a conv. class and not really all that necc... + def __init__( self, *args, **kwargs ): + # using subprocess as proxy data source in filtered line prov. + subproc_provider = SubprocessDataProvider( *args ) + super( RegexSubprocessDataProvider, self ).__init__( subproc_provider, **kwargs ) + + +# ----------------------------------------------------------------------------- other apis +class URLDataProvider( base.DataProvider ): + """ + Data provider that uses the contents of a URL for it's data source. + + This can be piped through other providers (column, map, genome region, etc.). + """ + VALID_METHODS = ( 'GET', 'POST' ) + + def __init__( self, url, method='GET', data=None, **kwargs ): + """ + :param url: the base URL to open. + :param method: the HTTP method to use. + Optional: defaults to 'GET' + :param data: any data to pass (either in query for 'GET' + or as post data with 'POST') + :type data: dict + """ + self.url = url + self.method = method + + self.data = data or {} + encoded_data = urllib.urlencode( self.data ) + + if method == 'GET': + self.url += '?%s' %( encoded_data ) + opened = urllib2.urlopen( url ) + elif method == 'POST': + opened = urllib2.urlopen( url, encoded_data ) + else: + raise ValueError( 'Not a valid method: %s' %( method ) ) + + super( URLDataProvider, self ).__init__( opened, **kwargs ) + #NOTE: the request object is now accessible as self.source + + def __enter__( self ): + pass + + def __exit__( self, *args ): + self.source.close() + + +# ----------------------------------------------------------------------------- generic compression +class GzipDataProvider( base.DataProvider ): + """ + Data provider that uses g(un)zip on a file as it's source. + + This can be piped through other providers (column, map, genome region, etc.). + """ + def __init__( self, source, **kwargs ): + unzipped = gzip.GzipFile( source, 'rb' ) + super( GzipDataProvider, self ).__init__( unzipped, **kwargs ) + #NOTE: the GzipFile is now accessible in self.source + + +# ----------------------------------------------------------------------------- intermediate tempfile +class TempfileDataProvider( base.DataProvider ): + """ + Writes the data from the given source to a temp file, allowing + it to be used as a source where a file_name is needed (e.g. as a parameter + to a command line tool: samtools view -t <this_provider.source.file_name>) + """ + def __init__( self, source, **kwargs ): + #TODO: + raise NotImplementedError() + # write the file here + self.create_file + super( TempfileDataProvider, self ).__init__( self.tmp_file, **kwargs ) + + def create_file( self ): + self.tmp_file = tempfile.NamedTemporaryFile() + return self.tmp_file + + def write_to_file( self ): + parent_gen = super( TempfileDataProvider, self ).__iter__() + #??? + with open( self.tmp_file, 'w' ) as open_file: + for datum in parent_gen: + open_file.write( datum + '\n' ) + This diff is so big that we needed to truncate the remainder. Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
commits-noreply@bitbucket.org