7 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/25277dc0e82f/ Changeset: 25277dc0e82f User: dannon Date: 2013-06-17 18:48:16 Summary: Remove cruft Affected #: 1 file diff -r cac2f16e960c698101f16f9cb6191f63eb03e6a8 -r 25277dc0e82f69853c9bf6c16b14d70b6399b327 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -919,8 +919,6 @@ return self.fail( "Job %s's output dataset(s) could not be read" % job.id ) job_context = ExpressionContext( dict( stdout = job.stdout, stderr = job.stderr ) ) - #DBTODO unused - #job_tool = self.app.toolbox.tools_by_id.get( job.tool_id, None ) for dataset_assoc in job.output_datasets + job.output_library_datasets: context = self.get_dataset_finish_context( job_context, dataset_assoc.dataset.dataset ) #should this also be checking library associations? - can a library item be added from a history before the job has ended? - lets not allow this to occur https://bitbucket.org/galaxy/galaxy-central/commits/159fadd1ee0c/ Changeset: 159fadd1ee0c User: dannon Date: 2013-06-18 02:46:58 Summary: Enable workflow continuation (Remapping of paused dependent jobs after state failure). Affected #: 10 files diff -r 25277dc0e82f69853c9bf6c16b14d70b6399b327 -r 159fadd1ee0c379992d1ebd625f4b8e7b25cfb81 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -692,7 +692,10 @@ incoming['__user_email__'] = incoming['userEmail'] = user_email incoming['__user_name__'] = user_name # Build params, done before hook so hook can use - param_dict = self.tool.build_param_dict( incoming, inp_data, out_data, self.get_output_fnames(), self.working_directory ) + param_dict = self.tool.build_param_dict( incoming, + inp_data, out_data, + self.get_output_fnames(), + self.working_directory ) # Certain tools require tasks to be completed prior to job execution # ( this used to be performed in the "exec_before_job" hook, but hooks are deprecated ). self.tool.exec_before_job( self.queue.app, inp_data, out_data, param_dict ) diff -r 25277dc0e82f69853c9bf6c16b14d70b6399b327 -r 159fadd1ee0c379992d1ebd625f4b8e7b25cfb81 lib/galaxy/jobs/handler.py --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -10,7 +10,7 @@ from sqlalchemy.sql.expression import and_, or_, select, func -from galaxy import util, model +from galaxy import model from galaxy.jobs import Sleeper, JobWrapper, TaskWrapper, JobDestination log = logging.getLogger( __name__ ) diff -r 25277dc0e82f69853c9bf6c16b14d70b6399b327 -r 159fadd1ee0c379992d1ebd625f4b8e7b25cfb81 lib/galaxy/model/mapping.py --- a/lib/galaxy/model/mapping.py +++ b/lib/galaxy/model/mapping.py @@ -1616,17 +1616,27 @@ primaryjoin=( model.LibraryDatasetDatasetInfoAssociation.table.c.form_values_id == model.FormValues.table.c.id ) ) ) ) -mapper( model.JobToInputDatasetAssociation, model.JobToInputDatasetAssociation.table, - properties=dict( job=relation( model.Job ), dataset=relation( model.HistoryDatasetAssociation, lazy=False, backref="dependent_jobs" ) ) ) +mapper( model.JobToInputDatasetAssociation, + model.JobToInputDatasetAssociation.table, properties=dict( + job=relation( model.Job ), dataset=relation( + model.HistoryDatasetAssociation, lazy=False, + backref="dependent_jobs" ) ) ) -mapper( model.JobToOutputDatasetAssociation, model.JobToOutputDatasetAssociation.table, - properties=dict( job=relation( model.Job ), dataset=relation( model.HistoryDatasetAssociation, lazy=False ) ) ) +mapper( model.JobToOutputDatasetAssociation, + model.JobToOutputDatasetAssociation.table, properties=dict( + job=relation( model.Job ), dataset=relation( + model.HistoryDatasetAssociation, lazy=False ) ) ) -mapper( model.JobToInputLibraryDatasetAssociation, model.JobToInputLibraryDatasetAssociation.table, - properties=dict( job=relation( model.Job ), dataset=relation( model.LibraryDatasetDatasetAssociation, lazy=False, backref="dependent_jobs" ) ) ) +mapper( model.JobToInputLibraryDatasetAssociation, + model.JobToInputLibraryDatasetAssociation.table, properties=dict( + job=relation( model.Job ), dataset=relation( + model.LibraryDatasetDatasetAssociation, lazy=False, + backref="dependent_jobs" ) ) ) -mapper( model.JobToOutputLibraryDatasetAssociation, model.JobToOutputLibraryDatasetAssociation.table, - properties=dict( job=relation( model.Job ), dataset=relation( model.LibraryDatasetDatasetAssociation, lazy=False ) ) ) +mapper( model.JobToOutputLibraryDatasetAssociation, + model.JobToOutputLibraryDatasetAssociation.table, properties=dict( + job=relation( model.Job ), dataset=relation( + model.LibraryDatasetDatasetAssociation, lazy=False ) ) ) mapper( model.JobParameter, model.JobParameter.table ) diff -r 25277dc0e82f69853c9bf6c16b14d70b6399b327 -r 159fadd1ee0c379992d1ebd625f4b8e7b25cfb81 lib/galaxy/tools/__init__.py --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -816,6 +816,7 @@ """ def __init__( self ): self.page = 0 + self.rerun_remap_job_id = None self.inputs = None def encode( self, tool, app, secure=True ): """ @@ -825,6 +826,7 @@ # page in that dict value = params_to_strings( tool.inputs, self.inputs, app ) value["__page__"] = self.page + value["__rerun_remap_job_id__"] = self.rerun_remap_job_id value = simplejson.dumps( value ) # Make it secure if secure: @@ -846,6 +848,7 @@ # Restore from string values = json_fix( simplejson.loads( value ) ) self.page = values.pop( "__page__" ) + self.rerun_remap_job_id = values.pop( "__rerun_remap_job_id__" ) self.inputs = params_from_strings( tool.inputs, values, app, ignore_errors=True ) class ToolOutput( object ): @@ -933,6 +936,7 @@ self.input_required = False self.display_interface = True self.require_login = False + self.rerun = False # Define a place to keep track of all input These # differ from the inputs dictionary in that inputs can be page # elements like conditionals, but input_params are basic form @@ -1933,7 +1937,11 @@ # If we've completed the last page we can execute the tool elif state.page == self.last_page: try: - _, out_data = self.execute( trans, incoming=params, history=history ) + rerun_remap_job_id = None + if state.rerun_remap_job_id is not None: + rerun_remap_job_id = trans.app.security.decode_id(state.rerun_remap_job_id) + log.debug('######### %s/%s' % (state.rerun_remap_job_id, rerun_remap_job_id)) + _, out_data = self.execute( trans, incoming=params, history=history, rerun_remap_job_id=rerun_remap_job_id ) except httpexceptions.HTTPFound, e: #if it's a paste redirect exception, pass it up the stack raise e @@ -2502,7 +2510,6 @@ datatypes_registry = self.app.datatypes_registry, tool = self, name = name ) - if data: for child in data.children: param_dict[ "_CHILD___%s___%s" % ( name, child.designation ) ] = DatasetFilenameWrapper( child ) diff -r 25277dc0e82f69853c9bf6c16b14d70b6399b327 -r 159fadd1ee0c379992d1ebd625f4b8e7b25cfb81 lib/galaxy/tools/actions/__init__.py --- a/lib/galaxy/tools/actions/__init__.py +++ b/lib/galaxy/tools/actions/__init__.py @@ -1,15 +1,16 @@ +import os +import galaxy.tools + +from galaxy.exceptions import ObjectInvalid from galaxy.model import LibraryDatasetDatasetAssociation -from galaxy.util.bunch import Bunch +from galaxy.tools.parameters import DataToolParameter, SelectToolParameter +from galaxy.tools.parameters.grouping import Conditional, Repeat +from galaxy.util.json import from_json_string +from galaxy.util.json import to_json_string +from galaxy.util.none_like import NoneDataset from galaxy.util.odict import odict -from galaxy.util.json import to_json_string -from galaxy.tools.parameters import * -from galaxy.tools.parameters.grouping import * from galaxy.util.template import fill_template -from galaxy.util.none_like import NoneDataset from galaxy.web import url_for -from galaxy.exceptions import ObjectInvalid -import galaxy.tools -from types import * import logging log = logging.getLogger( __name__ ) @@ -107,7 +108,7 @@ tool.visit_inputs( param_values, visitor ) return input_datasets - def execute(self, tool, trans, incoming={}, return_job=False, set_output_hid=True, set_output_history=True, history=None, job_params=None ): + def execute(self, tool, trans, incoming={}, return_job=False, set_output_hid=True, set_output_history=True, history=None, job_params=None, rerun_remap_job_id=None): """ Executes a tool, creating job and tool outputs, associating them, and submitting the job to the job queue. If history is not specified, use @@ -409,7 +410,41 @@ job.params = to_json_string( job_params ) job.set_handler(tool.get_job_handler(job_params)) trans.sa_session.add( job ) - trans.sa_session.flush() + # Now that we have a job id, we can remap any outputs if this is a rerun and the user chose to continue dependent jobs + # This functionality requires tracking jobs in the database. + if trans.app.config.track_jobs_in_database and rerun_remap_job_id is not None: + try: + old_job = trans.sa_session.query( trans.app.model.Job ).get(rerun_remap_job_id) + assert old_job is not None, '(%s/%s): Old job id is invalid' % (rerun_remap_job_id, job.id) + assert old_job.tool_id == job.tool_id, '(%s/%s): Old tool id (%s) does not match rerun tool id (%s)' % (old_job.id, job.id, old_job.tool_id, job.tool_id) + if trans.user is not None: + assert old_job.user_id == trans.user.id, '(%s/%s): Old user id (%s) does not match rerun user id (%s)' % (old_job.id, job.id, old_job.user_id, trans.user.id) + elif trans.user is None and type( galaxy_session ) == trans.model.GalaxySession: + assert old_job.session_id == galaxy_session.id, '(%s/%s): Old session id (%s) does not match rerun session id (%s)' % (old_job.id, job.id, old_job.session_id, galaxy_session.id) + else: + raise Exception('(%s/%s): Remapping via the API is not (yet) supported' % (old_job.id, job.id)) + for jtod in old_job.output_datasets: + for (job_to_remap, jtid) in [(jtid.job, jtid) for jtid in jtod.dataset.dependent_jobs]: + if (trans.user is not None and job_to_remap.user_id == trans.user.id) or (trans.user is None and job_to_remap.session_id == galaxy_session.id): + if job_to_remap.state == job_to_remap.states.PAUSED: + job_to_remap.state = job_to_remap.states.NEW + for hda in [ dep_jtod.dataset for dep_jtod in job_to_remap.output_datasets ]: + if hda.state == hda.states.PAUSED: + hda.state = hda.states.NEW + hda.info = None + for p in job_to_remap.parameters: + if p.name == jtid.name and p.value == str(jtod.dataset.id): + p.value = str(out_data[jtod.name].id) + jtid.dataset = out_data[jtod.name] + jtid.dataset.hid = jtod.dataset.hid + log.info('Job %s input HDA %s remapped to new HDA %s' % (job_to_remap.id, jtod.dataset.id, jtid.dataset.id)) + trans.sa_session.add(job_to_remap) + trans.sa_session.add(jtid) + jtod.dataset.visible = False + trans.sa_session.add(jtod) + trans.sa_session.flush() + except Exception, e: + log.exception('Cannot remap rerun dependencies.') # Some tools are not really executable, but jobs are still created for them ( for record keeping ). # Examples include tools that redirect to other applications ( epigraph ). These special tools must # include something that can be retrieved from the params ( e.g., REDIRECT_URL ) to keep the job diff -r 25277dc0e82f69853c9bf6c16b14d70b6399b327 -r 159fadd1ee0c379992d1ebd625f4b8e7b25cfb81 lib/galaxy/tools/actions/history_imp_exp.py --- a/lib/galaxy/tools/actions/history_imp_exp.py +++ b/lib/galaxy/tools/actions/history_imp_exp.py @@ -9,7 +9,7 @@ class ImportHistoryToolAction( ToolAction ): """Tool action used for importing a history to an archive. """ - def execute( self, tool, trans, incoming = {}, set_output_hid = False, overwrite = True, history=None ): + def execute( self, tool, trans, incoming = {}, set_output_hid = False, overwrite = True, history=None, **kwargs ): # # Create job. # @@ -57,7 +57,7 @@ class ExportHistoryToolAction( ToolAction ): """Tool action used for exporting a history to an archive. """ - def execute( self, tool, trans, incoming = {}, set_output_hid = False, overwrite = True, history=None ): + def execute( self, tool, trans, incoming = {}, set_output_hid = False, overwrite = True, history=None, **kwargs ): # # Get history to export. # diff -r 25277dc0e82f69853c9bf6c16b14d70b6399b327 -r 159fadd1ee0c379992d1ebd625f4b8e7b25cfb81 lib/galaxy/tools/actions/metadata.py --- a/lib/galaxy/tools/actions/metadata.py +++ b/lib/galaxy/tools/actions/metadata.py @@ -9,7 +9,7 @@ class SetMetadataToolAction( ToolAction ): """Tool action used for setting external metadata on an existing dataset""" - def execute( self, tool, trans, incoming={}, set_output_hid=False, overwrite=True, history=None, job_params=None ): + def execute( self, tool, trans, incoming={}, set_output_hid=False, overwrite=True, history=None, job_params=None, **kwargs ): """ Execute using a web transaction. """ diff -r 25277dc0e82f69853c9bf6c16b14d70b6399b327 -r 159fadd1ee0c379992d1ebd625f4b8e7b25cfb81 lib/galaxy/tools/actions/upload.py --- a/lib/galaxy/tools/actions/upload.py +++ b/lib/galaxy/tools/actions/upload.py @@ -5,7 +5,7 @@ log = logging.getLogger( __name__ ) class UploadToolAction( ToolAction ): - def execute( self, tool, trans, incoming={}, set_output_hid = True, history=None ): + def execute( self, tool, trans, incoming={}, set_output_hid = True, history=None, **kwargs ): dataset_upload_inputs = [] for input_name, input in tool.inputs.iteritems(): if input.type == "upload_dataset": diff -r 25277dc0e82f69853c9bf6c16b14d70b6399b327 -r 159fadd1ee0c379992d1ebd625f4b8e7b25cfb81 lib/galaxy/webapps/galaxy/controllers/tool_runner.py --- a/lib/galaxy/webapps/galaxy/controllers/tool_runner.py +++ b/lib/galaxy/webapps/galaxy/controllers/tool_runner.py @@ -218,6 +218,14 @@ # Create a fake tool_state for the tool, with the parameters values state = tool.new_state( trans ) state.inputs = params_objects + # If the job failed and has dependencies, allow dependency remap + if job.state == job.states.ERROR: + try: + if [ hda.dependent_jobs for hda in [ jtod.dataset for jtod in job.output_datasets ] if hda.dependent_jobs ]: + state.rerun_remap_job_id = trans.app.security.encode_id(job.id) + except: + # Job has no outputs? + pass #create an incoming object from the original job's dataset-modified param objects incoming = {} params_to_incoming( incoming, tool.inputs, params_objects, trans.app ) diff -r 25277dc0e82f69853c9bf6c16b14d70b6399b327 -r 159fadd1ee0c379992d1ebd625f4b8e7b25cfb81 templates/webapps/galaxy/tool_form.mako --- a/templates/webapps/galaxy/tool_form.mako +++ b/templates/webapps/galaxy/tool_form.mako @@ -253,6 +253,18 @@ </div></%def> +<%def name="row_for_rerun()"> + %if trans.app.config.track_jobs_in_database and tool_state.rerun_remap_job_id is not None: + <div class="form-row"> + <input type="checkbox" name="rerun_remap_job_id" value="${tool_state.rerun_remap_job_id}"> Resume dependencies from this job + <div class="toolParamHelp" style="clear: both;"> + The previous run of this tool failed and other tools were waiting for it to finish successfully, use this option to resume those tools using the outputs of this tool run. + </div> + <div> + <div style="clear: both;"></div> + %endif +</%def> + <% overlay(visible=False) %> %if add_frame.from_noframe: @@ -321,9 +333,13 @@ <input type="hidden" name="tool_state" value="${util.object_to_string( tool_state.encode( tool, app ) )}"> %if tool.display_by_page[tool_state.page]: ${trans.fill_template_string( tool.display_by_page[tool_state.page], context=tool.get_param_html_map( trans, tool_state.page, tool_state.inputs ) )} + ${row_for_rerun()} <input type="submit" class="btn btn-primary" name="runtool_btn" value="Execute"> %else: ${do_inputs( tool.inputs_by_page[ tool_state.page ], tool_state.inputs, errors, "" )} + %if tool_state.page == tool.last_page: + ${row_for_rerun()} + %endif <div class="form-row form-actions"> %if tool_state.page == tool.last_page: <input type="submit" class="btn btn-primary" name="runtool_btn" value="Execute"> https://bitbucket.org/galaxy/galaxy-central/commits/8df7f1c47489/ Changeset: 8df7f1c47489 User: dannon Date: 2013-06-18 02:48:57 Summary: Merge Affected #: 41 files diff -r 159fadd1ee0c379992d1ebd625f4b8e7b25cfb81 -r 8df7f1c474896ed75557efc6f3359d09c3772c26 lib/galaxy/datatypes/binary.py --- a/lib/galaxy/datatypes/binary.py +++ b/lib/galaxy/datatypes/binary.py @@ -22,6 +22,7 @@ from galaxy.datatypes.metadata import MetadataElement from galaxy.datatypes import metadata from galaxy.datatypes.sniff import * +import dataproviders log = logging.getLogger(__name__) @@ -74,6 +75,7 @@ trans.response.headers["Content-Disposition"] = 'attachment; filename="Galaxy%s-[%s].%s"' % (dataset.hid, fname, to_ext) return open( dataset.file_name ) + class Ab1( Binary ): """Class describing an ab1 binary sequence file""" file_ext = "ab1" @@ -93,12 +95,15 @@ Binary.register_unsniffable_binary_ext("ab1") + class GenericAsn1Binary( Binary ): """Class for generic ASN.1 binary format""" file_ext = "asn1-binary" Binary.register_unsniffable_binary_ext("asn1-binary") + +@dataproviders.decorators.has_dataproviders class Bam( Binary ): """Class describing a BAM binary file""" file_ext = "bam" @@ -255,9 +260,92 @@ return dataset.peek except: return "Binary bam alignments file (%s)" % ( data.nice_size( dataset.get_size() ) ) + + # ------------- Dataproviders + # pipe through samtools view + #ALSO: (as Sam) + # bam does not use '#' to indicate comments/headers - we need to strip out those headers from the std. providers + #TODO:?? seems like there should be an easier way to do/inherit this - metadata.comment_char? + #TODO: incorporate samtools options to control output: regions first, then flags, etc. + @dataproviders.decorators.dataprovider_factory( 'line' ) + def line_dataprovider( self, dataset, **settings ): + samtools_source = dataproviders.dataset.SamtoolsDataProvider( dataset ) + settings[ 'comment_char' ] = '@' + return dataproviders.line.FilteredLineDataProvider( samtools_source, **settings ) + + @dataproviders.decorators.dataprovider_factory( 'regex-line' ) + def regex_line_dataprovider( self, dataset, **settings ): + samtools_source = dataproviders.dataset.SamtoolsDataProvider( dataset ) + settings[ 'comment_char' ] = '@' + return dataproviders.line.RegexLineDataProvider( samtools_source, **settings ) + @dataproviders.decorators.dataprovider_factory( 'column' ) + def column_dataprovider( self, dataset, **settings ): + samtools_source = dataproviders.dataset.SamtoolsDataProvider( dataset ) + settings[ 'comment_char' ] = '@' + return dataproviders.column.ColumnarDataProvider( samtools_source, **settings ) + + @dataproviders.decorators.dataprovider_factory( 'map' ) + def map_dataprovider( self, dataset, **settings ): + samtools_source = dataproviders.dataset.SamtoolsDataProvider( dataset ) + settings[ 'comment_char' ] = '@' + return dataproviders.column.MapDataProvider( samtools_source, **settings ) + + # these can't be used directly - may need BamColumn, BamMap (Bam metadata -> column/map) + # OR - see genomic_region_dataprovider + #@dataproviders.decorators.dataprovider_factory( 'dataset-column' ) + #def dataset_column_dataprovider( self, dataset, **settings ): + # settings[ 'comment_char' ] = '@' + # return super( Sam, self ).dataset_column_dataprovider( dataset, **settings ) + + #@dataproviders.decorators.dataprovider_factory( 'dataset-map' ) + #def dataset_map_dataprovider( self, dataset, **settings ): + # settings[ 'comment_char' ] = '@' + # return super( Sam, self ).dataset_map_dataprovider( dataset, **settings ) + + @dataproviders.decorators.dataprovider_factory( 'header' ) + def header_dataprovider( self, dataset, **settings ): + # in this case we can use an option of samtools view to provide just what we need (w/o regex) + samtools_source = dataproviders.dataset.SamtoolsDataProvider( dataset, '-H' ) + return dataproviders.line.RegexLineDataProvider( samtools_source, **settings ) + + @dataproviders.decorators.dataprovider_factory( 'id-seq-qual' ) + def id_seq_qual_dataprovider( self, dataset, **settings ): + settings[ 'indeces' ] = [ 0, 9, 10 ] + settings[ 'column_types' ] = [ 'str', 'str', 'str' ] + settings[ 'column_names' ] = [ 'id', 'seq', 'qual' ] + return self.map_dataprovider( dataset, **settings ) + + @dataproviders.decorators.dataprovider_factory( 'genomic-region' ) + def genomic_region_dataprovider( self, dataset, **settings ): + # GenomicRegionDataProvider currently requires a dataset as source - may not be necc. + #TODO:?? consider (at least) the possible use of a kwarg: metadata_source (def. to source.dataset), + # or remove altogether... + #samtools_source = dataproviders.dataset.SamtoolsDataProvider( dataset ) + #return dataproviders.dataset.GenomicRegionDataProvider( samtools_source, metadata_source=dataset, + # 2, 3, 3, **settings ) + + # instead, set manually and use in-class column gen + settings[ 'indeces' ] = [ 2, 3, 3 ] + settings[ 'column_types' ] = [ 'str', 'int', 'int' ] + return self.column_dataprovider( dataset, **settings ) + + @dataproviders.decorators.dataprovider_factory( 'genomic-region-map' ) + def genomic_region_map_dataprovider( self, dataset, **settings ): + settings[ 'indeces' ] = [ 2, 3, 3 ] + settings[ 'column_types' ] = [ 'str', 'int', 'int' ] + settings[ 'column_names' ] = [ 'chrom', 'start', 'end' ] + return self.map_dataprovider( dataset, **settings ) + + @dataproviders.decorators.dataprovider_factory( 'samtools' ) + def samtools_dataprovider( self, dataset, **settings ): + """Generic samtools interface - all options available through settings.""" + dataset_source = dataproviders.dataset.DatasetDataProvider( dataset ) + return dataproviders.dataset.SamtoolsDataProvider( dataset_source, **settings ) + Binary.register_sniffable_binary_format("bam", "bam", Bam) + class H5( Binary ): """Class describing an HDF5 file""" file_ext = "h5" @@ -277,6 +365,7 @@ Binary.register_unsniffable_binary_ext("h5") + class Scf( Binary ): """Class describing an scf binary sequence file""" file_ext = "scf" @@ -296,6 +385,7 @@ Binary.register_unsniffable_binary_ext("scf") + class Sff( Binary ): """ Standard Flowgram Format (SFF) """ file_ext = "sff" @@ -327,6 +417,7 @@ Binary.register_sniffable_binary_format("sff", "sff", Sff) + class BigWig(Binary): """ Accessing binary BigWig files from UCSC. @@ -363,6 +454,7 @@ Binary.register_sniffable_binary_format("bigwig", "bigwig", BigWig) + class BigBed(BigWig): """BigBed support from UCSC.""" @@ -375,6 +467,7 @@ Binary.register_sniffable_binary_format("bigbed", "bigbed", BigBed) + class TwoBit (Binary): """Class describing a TwoBit format nucleotide file""" diff -r 159fadd1ee0c379992d1ebd625f4b8e7b25cfb81 -r 8df7f1c474896ed75557efc6f3359d09c3772c26 lib/galaxy/datatypes/data.py --- a/lib/galaxy/datatypes/data.py +++ b/lib/galaxy/datatypes/data.py @@ -14,6 +14,8 @@ from galaxy.util.odict import odict from galaxy.util.sanitize_html import sanitize_html +import dataproviders + from galaxy import eggs eggs.require( "Paste" ) import paste @@ -56,6 +58,7 @@ cls.metadata_spec.update( base.metadata_spec ) #add contents of metadata spec of base class to cls metadata.Statement.process( cls ) +@dataproviders.decorators.has_dataproviders class Data( object ): """ Base class for all datatypes. Implements basic interfaces as well @@ -578,6 +581,40 @@ return [ 'trackster', 'circster' ] return [] + # ------------- Dataproviders + def has_dataprovider( self, data_format ): + """ + Returns True if `data_format` is available in `dataproviders`. + """ + return ( data_format in self.dataproviders ) + + def dataprovider( self, dataset, data_format, **settings ): + """ + Base dataprovider factory for all datatypes that returns the proper provider + for the given `data_format` or raises a `NoProviderAvailable`. + """ + #TODO:?? is this handling super class providers? + if self.has_dataprovider( data_format ): + return self.dataproviders[ data_format ]( self, dataset, **settings ) + raise dataproviders.exceptions.NoProviderAvailable( self, data_format ) + + @dataproviders.decorators.dataprovider_factory( 'base' ) + def base_dataprovider( self, dataset, **settings ): + dataset_source = dataproviders.dataset.DatasetDataProvider( dataset ) + return dataproviders.base.DataProvider( dataset_source, **settings ) + + @dataproviders.decorators.dataprovider_factory( 'chunk' ) + def chunk_dataprovider( self, dataset, **settings ): + dataset_source = dataproviders.dataset.DatasetDataProvider( dataset ) + return dataproviders.chunk.ChunkDataProvider( dataset_source, **settings ) + + @dataproviders.decorators.dataprovider_factory( 'chunk64' ) + def chunk64_dataprovider( self, dataset, **settings ): + dataset_source = dataproviders.dataset.DatasetDataProvider( dataset ) + return dataproviders.chunk.Base64ChunkDataProvider( dataset_source, **settings ) + + +@dataproviders.decorators.has_dataproviders class Text( Data ): file_ext = 'txt' line_class = 'line' @@ -747,10 +784,31 @@ f.close() split = classmethod(split) + # ------------- Dataproviders + @dataproviders.decorators.dataprovider_factory( 'line' ) + def line_dataprovider( self, dataset, **settings ): + """ + Returns an iterator over the dataset's lines (that have been `strip`ed) + optionally excluding blank lines and lines that start with a comment character. + """ + dataset_source = dataproviders.dataset.DatasetDataProvider( dataset ) + return dataproviders.line.FilteredLineDataProvider( dataset_source, **settings ) + + @dataproviders.decorators.dataprovider_factory( 'regex-line' ) + def regex_line_dataprovider( self, dataset, **settings ): + """ + Returns an iterator over the dataset's lines + optionally including/excluding lines that match one or more regex filters. + """ + dataset_source = dataproviders.dataset.DatasetDataProvider( dataset ) + return dataproviders.line.RegexLineDataProvider( dataset_source, **settings ) + + class GenericAsn1( Text ): """Class for generic ASN.1 text format""" file_ext = 'asn1' + class LineCount( Text ): """ Dataset contains a single line with a single integer that denotes the @@ -758,6 +816,7 @@ """ pass + class Newick( Text ): """New Hampshire/Newick Format""" file_ext = "nhx" diff -r 159fadd1ee0c379992d1ebd625f4b8e7b25cfb81 -r 8df7f1c474896ed75557efc6f3359d09c3772c26 lib/galaxy/datatypes/dataproviders/__init__.py --- /dev/null +++ b/lib/galaxy/datatypes/dataproviders/__init__.py @@ -0,0 +1,28 @@ + +#TODO: ---- This is a work in progress ---- +""" +Dataproviders are iterators with context managers that provide data to some +consumer datum by datum. + +As well as subclassing and overriding to get the proper data, Dataproviders +can be piped from one to the other. +..example:: + +.. note:: be careful to NOT pipe providers into subclasses of those providers. + Subclasses provide all the functionality of their superclasses, + so there's generally no need. + +.. note:: be careful to when using piped providers that accept the same keywords + in their __init__ functions (such as limit or offset) to pass those + keywords to the proper (often final) provider. These errors that result + can be hard to diagnose. +""" +import decorators +import exceptions + +import base +import chunk +import line +import column +import external +import dataset diff -r 159fadd1ee0c379992d1ebd625f4b8e7b25cfb81 -r 8df7f1c474896ed75557efc6f3359d09c3772c26 lib/galaxy/datatypes/dataproviders/base.py --- /dev/null +++ b/lib/galaxy/datatypes/dataproviders/base.py @@ -0,0 +1,260 @@ +""" +Base class(es) for all DataProviders. +""" +# there's a blurry line between functionality here and functionality in datatypes module +# attempting to keep parsing to a minimum here and focus on chopping/pagination/reformat(/filtering-maybe?) +# and using as much pre-computed info/metadata from the datatypes module as possible +# also, this shouldn't be a replacement/re-implementation of the tool layer +# (which provides traceability/versioning/reproducibility) + +from collections import deque +import exceptions + +_TODO = """ +hooks into datatypes (define providers inside datatype modules) as factories +capture tell() when provider is done + def stop( self ): self.endpoint = source.tell(); raise StopIteration() +implement __len__ sensibly where it can be (would be good to have where we're giving some progress - '100 of 300') + seems like sniffed files would have this info +unit tests +add datum entry/exit point methods: possibly decode, encode + or create a class that pipes source through - how would decode work then? + +icorporate existing visualization/dataproviders +some of the sources (esp. in datasets) don't need to be re-created + +YAGNI: InterleavingMultiSourceDataProvider, CombiningMultiSourceDataProvider +""" + +import logging +log = logging.getLogger( __name__ ) + + +# ----------------------------------------------------------------------------- base classes +class DataProvider( object ): + """ + Base class for all data providers. Data providers: + (a) have a source (which must be another file-like object) + (b) implement both the iterator and context manager interfaces + (c) do not allow write methods + (but otherwise implement the other file object interface methods) + """ + def __init__( self, source, **kwargs ): + """ + :param source: the source that this iterator will loop over. + (Should implement the iterable interface and ideally have the + context manager interface as well) + """ + self.source = self.validate_source( source ) + + def validate_source( self, source ): + """ + Is this a valid source for this provider? + + :raises InvalidDataProviderSource: if the source is considered invalid. + + Meant to be overridden in subclasses. + """ + if not source or not hasattr( source, '__iter__' ): + # that's by no means a thorough check + raise exceptions.InvalidDataProviderSource( source ) + return source + + #TODO: (this might cause problems later...) + #TODO: some providers (such as chunk's seek and read) rely on this... remove + def __getattr__( self, name ): + if name == 'source': + # if we're inside this fn, source hasn't been set - provide some safety just for this attr + return None + # otherwise, try to get the attr from the source - allows us to get things like provider.encoding, etc. + if hasattr( self.source, name ): + return getattr( self.source, name ) + # raise the proper error + return self.__getattribute__( name ) + + # write methods should not be allowed + def truncate( self, size ): + raise NotImplementedError( 'Write methods are purposely disabled' ) + def write( self, string ): + raise NotImplementedError( 'Write methods are purposely disabled' ) + def writelines( self, sequence ): + raise NotImplementedError( 'Write methods are purposely disabled' ) + + #TODO: route read methods through next? + #def readline( self ): + # return self.next() + def readlines( self ): + return [ line for line in self ] + + # iterator interface + def __iter__( self ): + # it's generators all the way up, Timmy + with self as source: + for datum in self.source: + yield datum + def next( self ): + return self.source.next() + + # context manager interface + def __enter__( self ): + # make the source's context manager interface optional + if hasattr( self.source, '__enter__' ): + self.source.__enter__() + return self + def __exit__( self, *args ): + # make the source's context manager interface optional, call on source if there + if hasattr( self.source, '__exit__' ): + self.source.__exit__( *args ) + # alternately, call close() + elif hasattr( self.source, 'close' ): + self.source.close() + + def __str__( self ): + """ + String representation for easier debugging. + + Will call `__str__` on it's source so this will display piped dataproviders. + """ + # we need to protect against recursion (in __getattr__) if self.source hasn't been set + source_str = str( self.source ) if hasattr( self, 'source' ) else '' + return '%s(%s)' %( self.__class__.__name__, str( source_str ) ) + + +class FilteredDataProvider( DataProvider ): + """ + Passes each datum through a filter function and yields it if that function + returns a non-`None` value. + + Also maintains counters: + - `num_data_read`: how many data have been consumed from the source. + - `num_valid_data_read`: how many data have been returned from `filter`. + - `num_data_returned`: how many data has this provider yielded. + """ + def __init__( self, source, filter_fn=None, **kwargs ): + """ + :param filter_fn: a lambda or function that will be passed a datum and + return either the (optionally modified) datum or None. + """ + super( FilteredDataProvider, self ).__init__( source, **kwargs ) + self.filter_fn = filter_fn + # count how many data we got from the source + self.num_data_read = 0 + # how many valid data have we gotten from the source + # IOW, data that's passed the filter and been either provided OR have been skipped due to offset + self.num_valid_data_read = 0 + # how many lines have been provided/output + self.num_data_returned = 0 + + def __iter__( self ): + parent_gen = super( FilteredDataProvider, self ).__iter__() + for datum in parent_gen: + self.num_data_read += 1 + datum = self.filter( datum ) + if datum != None: + self.num_valid_data_read += 1 + self.num_data_returned += 1 + yield datum + + #TODO: may want to squash this into DataProvider + def filter( self, datum ): + """ + When given a datum from the provider's source, return None if the datum + 'does not pass' the filter or is invalid. Return the datum if it's valid. + + :param datum: the datum to check for validity. + :returns: the datum, a modified datum, or None + + Meant to be overridden. + """ + if self.filter_fn: + return self.filter_fn( datum ) + # also can be overriden entirely + return datum + + +class LimitedOffsetDataProvider( FilteredDataProvider ): + """ + A provider that uses the counters from FilteredDataProvider to limit the + number of data and/or skip `offset` number of data before providing. + + Useful for grabbing sections from a source (e.g. pagination). + """ + #TODO: may want to squash this into DataProvider + def __init__( self, source, offset=0, limit=None, **kwargs ): + """ + :param offset: the number of data to skip before providing. + :param limit: the final number of data to provide. + """ + super( LimitedOffsetDataProvider, self ).__init__( source, **kwargs ) + + # how many valid data to skip before we start outputing data - must be positive + # (diff to support neg. indeces - must be pos.) + self.offset = max( offset, 0 ) + + # how many valid data to return - must be positive (None indicates no limit) + self.limit = limit + if self.limit != None: + self.limit = max( self.limit, 0 ) + + def __iter__( self ): + """ + Iterate over the source until `num_valid_data_read` is greater than + `offset`, begin providing datat, and stop when `num_data_returned` + is greater than `offset`. + """ + parent_gen = super( LimitedOffsetDataProvider, self ).__iter__() + for datum in parent_gen: + + if self.limit != None and self.num_data_returned > self.limit: + break + + if self.num_valid_data_read > self.offset: + yield datum + else: + # wot a cheezy way of doing this... + self.num_data_returned -= 1 + + #TODO: skipping lines is inefficient - somehow cache file position/line_num pair and allow provider + # to seek to a pos/line and then begin providing lines + # the important catch here is that we need to have accurate pos/line pairs + # in order to preserve the functionality of limit and offset + #if file_seek and len( file_seek ) == 2: + # seek_pos, new_line_num = file_seek + # self.seek_and_set_curr_line( seek_pos, new_line_num ) + + #def seek_and_set_curr_line( self, file_seek, new_curr_line_num ): + # self.seek( file_seek, os.SEEK_SET ) + # self.curr_line_num = new_curr_line_num + + +class MultiSourceDataProvider( DataProvider ): + """ + A provider that iterates over a list of given sources and provides data + from one after another. + + An iterator over iterators. + """ + def __init__( self, source_list, **kwargs ): + """ + :param source_list: an iterator of iterables + """ + self.source_list = deque( source_list ) + + def __iter__( self ): + """ + Iterate over the source_list, then iterate over the data in each source. + + Skip a given source in `source_list` if it is `None` or invalid. + """ + for source in self.source_list: + # just skip falsy sources + if not source: + continue + try: + self.source = self.validate_source( source ) + except exceptions.InvalidDataProviderSource, invalid_source: + continue + + parent_gen = super( MultiSourceDataProvider, self ).__iter__() + for datum in parent_gen: + yield datum diff -r 159fadd1ee0c379992d1ebd625f4b8e7b25cfb81 -r 8df7f1c474896ed75557efc6f3359d09c3772c26 lib/galaxy/datatypes/dataproviders/chunk.py --- /dev/null +++ b/lib/galaxy/datatypes/dataproviders/chunk.py @@ -0,0 +1,80 @@ +""" +Chunk (N number of bytes at M offset to a source's beginning) provider. + +Primarily for file sources but usable by any iterator that has both +seek and read( N ). +""" +import os +import base64 + +import base +import exceptions + +_TODO = """ +""" + +import logging +log = logging.getLogger( __name__ ) + + +# ----------------------------------------------------------------------------- +class ChunkDataProvider( base.DataProvider ): + """ + Data provider that yields chunks of data from it's file. + + Note: this version does not account for lines and works with Binary datatypes. + """ + MAX_CHUNK_SIZE = 2**16 + DEFAULT_CHUNK_SIZE = MAX_CHUNK_SIZE + + #TODO: subclass from LimitedOffsetDataProvider? + # see web/framework/base.iterate_file, util/__init__.file_reader, and datatypes.tabular + def __init__( self, source, chunk_index=0, chunk_size=DEFAULT_CHUNK_SIZE, **kwargs ): + """ + :param chunk_index: if a source can be divided into N number of + `chunk_size` sections, this is the index of which section to + return. + :param chunk_size: how large are the desired chunks to return + (gen. in bytes). + """ + super( ChunkDataProvider, self ).__init__( source, **kwargs ) + self.chunk_size = chunk_size + self.chunk_pos = chunk_index * self.chunk_size + + def validate_source( self, source ): + """ + Does the given source have both the methods `seek` and `read`? + :raises InvalidDataProviderSource: if not. + """ + source = super( ChunkDataProvider, self ).validate_source( source ) + if( ( not hasattr( source, 'seek' ) ) + or ( not hasattr( source, 'read' ) ) ): + raise exceptions.InvalidDataProviderSource( source ) + return source + + def __iter__( self ): + # not reeeally an iterator per se + self.__enter__() + self.source.seek( self.chunk_pos, os.SEEK_SET ) + chunk = self.encode( self.source.read( self.chunk_size ) ) + yield chunk + self.__exit__() + + def encode( self, chunk ): + """ + Called on the chunk before returning. + + Overrride to modify, encode, or decode chunks. + """ + return chunk + + +class Base64ChunkDataProvider( ChunkDataProvider ): + """ + Data provider that yields chunks of base64 encoded data from it's file. + """ + def encode( self, chunk ): + """ + Return chunks encoded in base 64. + """ + return base64.b64encode( chunk ) diff -r 159fadd1ee0c379992d1ebd625f4b8e7b25cfb81 -r 8df7f1c474896ed75557efc6f3359d09c3772c26 lib/galaxy/datatypes/dataproviders/column.py --- /dev/null +++ b/lib/galaxy/datatypes/dataproviders/column.py @@ -0,0 +1,242 @@ +""" +Providers that provide lists of lists generally where each line of a source +is further subdivided into multiple data (e.g. columns from a line). +""" + +import line + +_TODO = """ +move ColumnarDataProvider parsers to more sensible location + +TransposedColumnarDataProvider: provides each column as a single array + - see existing visualizations/dataprovider/basic.ColumnDataProvider +""" + +import logging +log = logging.getLogger( __name__ ) + + +# ----------------------------------------------------------------------------- base classes +class ColumnarDataProvider( line.RegexLineDataProvider ): + """ + Data provider that provide a list of columns from the lines of it's source. + + Columns are returned in the order given in indeces, so this provider can + re-arrange columns. + + If any desired index is outside the actual number of columns + in the source, this provider will None-pad the output and you are guaranteed + the same number of columns as the number of indeces asked for (even if they + are filled with None). + """ + def __init__( self, source, indeces=None, + column_count=None, column_types=None, parsers=None, parse_columns=True, + deliminator='\t', **kwargs ): + """ + :param indeces: a list of indeces of columns to gather from each row + Optional: will default to `None`. + If `None`, this provider will return all rows (even when a + particular row contains more/less than others). + If a row/line does not contain an element at a given index, the + provider will-return/fill-with a `None` value as the element. + :type indeces: list or None + + :param column_count: an alternate means of defining indeces, use an int + here to effectively provide the first N columns. + Optional: will default to `None`. + :type column_count: int + + :param column_types: a list of string names of types that the + provider will use to look up an appropriate parser for the column. + (e.g. 'int', 'float', 'str', 'bool') + Optional: will default to parsing all columns as strings. + :type column_types: list of strings + + :param parsers: a dictionary keyed with column type strings + and with values that are functions to use when parsing those + types. + Optional: will default to using the function `_get_default_parsers`. + :type parsers: dictionary + + :param parse_columns: attempt to parse columns? + Optional: defaults to `True`. + :type parse_columns: bool + + :param deliminator: character(s) used to split each row/line of the source. + Optional: defaults to the tab character. + :type deliminator: str + + .. note: that the subclass constructors are passed kwargs - so they're + params (limit, offset, etc.) are also applicable here. + """ + #TODO: other columnar formats: csv, etc. + super( ColumnarDataProvider, self ).__init__( source, **kwargs ) + + #IMPLICIT: if no indeces, column_count, or column_types passed: return all columns + self.selected_column_indeces = indeces + self.column_count = column_count + self.column_types = column_types or [] + # if no column count given, try to infer from indeces or column_types + if not self.column_count: + if self.selected_column_indeces: + self.column_count = len( self.selected_column_indeces ) + elif self.column_types: + self.column_count = len( self.column_types ) + # if no indeces given, infer from column_count + if not self.selected_column_indeces and self.column_count: + self.selected_column_indeces = list( xrange( self.column_count ) ) + + self.deliminator = deliminator + + # how/whether to parse each column value + self.parsers = {} + if parse_columns: + self.parsers = self._get_default_parsers() + # overwrite with user desired parsers + self.parsers.update( parsers or {} ) + + def _get_default_parsers( self ): + """ + Return parser dictionary keyed for each columnar type + (as defined in datatypes). + + .. note: primitives only by default (str, int, float, boolean, None). + Other (more complex) types are retrieved as strings. + :returns: a dictionary of the form: + `{ <parser type name> : <function used to parse type> }` + """ + #TODO: move to module level (or datatypes, util) + return { + # str is default and not needed here + 'int' : int, + 'float' : float, + 'bool' : bool, + + # unfortunately, 'list' is used in dataset metadata both for + # query style maps (9th col gff) AND comma-sep strings. + # (disabled for now) + #'list' : lambda v: v.split( ',' ), + #'csv' : lambda v: v.split( ',' ), + ## i don't like how urlparses does sub-lists... + #'querystr' : lambda v: dict([ ( p.split( '=', 1 ) if '=' in p else ( p, True ) ) + # for p in v.split( ';', 1 ) ]) + + #'scifloat': #floating point which may be in scientific notation + + # always with the 1 base, biologists? + #'int1' : ( lambda i: int( i ) - 1 ), + + #'gffval': string or '.' for None + #'gffint': # int or '.' for None + #'gffphase': # 0, 1, 2, or '.' for None + #'gffstrand': # -, +, ?, or '.' for None, etc. + } + + def _parse_value( self, val, type ): + """ + Attempt to parse and return the given value based on the given type. + + :param val: the column value to parse (often a string) + :param type: the string type 'name' used to find the appropriate parser + :returns: the parsed value + or `value` if no `type` found in `parsers` + or `None` if there was a parser error (ValueError) + """ + if type == 'str' or type == None: return val + try: + return self.parsers[ type ]( val ) + except KeyError, err: + # no parser - return as string + pass + except ValueError, err: + # bad value - return None + return None + return val + + def _get_column_type( self, index ): + """ + Get the column type for the parser from `self.column_types` or `None` + if the type is unavailable. + :param index: the column index + :returns: string name of type (e.g. 'float', 'int', etc.) + """ + try: + return self.column_types[ index ] + except IndexError, ind_err: + return None + + def _parse_column_at_index( self, columns, parser_index, index ): + """ + Get the column type for the parser from `self.column_types` or `None` + if the type is unavailable. + """ + try: + return self._parse_value( columns[ index ], self._get_column_type( parser_index ) ) + # if a selected index is not within columns, return None + except IndexError, index_err: + return None + + def _parse_columns_from_line( self, line ): + """ + Returns a list of the desired, parsed columns. + :param line: the line to parse + :type line: str + """ + #TODO: too much going on in this loop - the above should all be precomputed AMAP... + all_columns = line.split( self.deliminator ) + # if no indeces were passed to init, return all columns + selected_indeces = self.selected_column_indeces or list( xrange( len( all_columns ) ) ) + parsed_columns = [] + for parser_index, column_index in enumerate( selected_indeces ): + parsed_columns.append( self._parse_column_at_index( all_columns, parser_index, column_index ) ) + return parsed_columns + + def __iter__( self ): + parent_gen = super( ColumnarDataProvider, self ).__iter__() + for line in parent_gen: + columns = self._parse_columns_from_line( line ) + yield columns + + #TODO: implement column filters here and not below - flatten hierarchy + +class FilteredByColumnDataProvider( ColumnarDataProvider ): + """ + Data provider that provide a list of columns from the lines of it's source + _only_ if they pass a given filter function. + + e.g. column #3 is type int and > N + """ + # TODO: how to do this and still have limit and offset work? + def __init__( self, source, **kwargs ): + raise NotImplementedError() + super( FilteredByColumnDataProvider, self ).__init__( source, **kwargs ) + + +class MapDataProvider( ColumnarDataProvider ): + """ + Data provider that column_names and columns from the source's contents + into a dictionary. + + A combination use of both `column_names` and `indeces` allows 'picking' + key/value pairs from the source. + + .. note: that the subclass constructors are passed kwargs - so they're + params (limit, offset, etc.) are also applicable here. + """ + def __init__( self, source, column_names=None, **kwargs ): + """ + :param column_names: an ordered list of strings that will be used as the keys + for each column in the returned dictionaries. + The number of key, value pairs each returned dictionary has will + be as short as the number of column names provided. + :type column_names: + """ + #TODO: allow passing in a map instead of name->index { 'name1': index1, ... } + super( MapDataProvider, self ).__init__( source, **kwargs ) + self.column_names = column_names or [] + + def __iter__( self ): + parent_gen = super( MapDataProvider, self ).__iter__() + for column_values in parent_gen: + map = dict( zip( self.column_names, column_values ) ) + yield map diff -r 159fadd1ee0c379992d1ebd625f4b8e7b25cfb81 -r 8df7f1c474896ed75557efc6f3359d09c3772c26 lib/galaxy/datatypes/dataproviders/dataset.py --- /dev/null +++ b/lib/galaxy/datatypes/dataproviders/dataset.py @@ -0,0 +1,671 @@ +""" +Dataproviders that use either: + - the file contents and/or metadata from a Galaxy DatasetInstance as + their source. + - or provide data in some way relevant to bioinformatic data + (e.g. parsing genomic regions from their source) +""" + +import pkg_resources +pkg_resources.require( 'bx-python' ) +from bx import seq as bx_seq +from bx import wiggle as bx_wig + +import galaxy.model +import galaxy.datatypes +import galaxy.datatypes.data + +#TODO: can't import these due to circular ref in model/registry +#import galaxy.datatypes.binary +#import galaxy.datatypes.tabular + +import exceptions +import base +import line +import column +import external + +_TODO = """ +use bx as much as possible +the use of DatasetInstance seems to create some import problems +gff3 hierarchies +""" + +import logging +log = logging.getLogger( __name__ ) + + +# ----------------------------------------------------------------------------- base for using a Glx dataset +class DatasetDataProvider( base.DataProvider ): + """ + Class that uses the file contents and/or metadata from a Galaxy DatasetInstance + as it's source. + + DatasetDataProvider can be seen as the intersection between a datatype's + metadata and a dataset's file contents. It (so far) mainly provides helper + and conv. methods for using dataset metadata to set up and control how + the data is provided. + """ + def __init__( self, dataset, **kwargs ): + """ + :param dataset: the Galaxy dataset whose file will be the source + :type dataset: model.DatasetInstance + + :raises exceptions.InvalidDataProviderSource: if not a DatsetInstance + """ + if not isinstance( dataset, galaxy.model.DatasetInstance ): + raise exceptions.InvalidDataProviderSource( "Data provider can only be used with a DatasetInstance" ) + self.dataset = dataset + # this dataset file is obviously the source + #TODO: this might be a good place to interface with the object_store... + super( DatasetDataProvider, self ).__init__( open( dataset.file_name, 'rb' ) ) + + #TODO: this is a bit of a mess + @classmethod + def get_column_metadata_from_dataset( cls, dataset ): + """ + Convenience class method to get column metadata from a dataset. + :returns: a dictionary of `column_count`, `column_types`, and `column_names` + if they're available, setting each to `None` if not. + """ + # re-map keys to fit ColumnarProvider.__init__ kwargs + params = {} + params[ 'column_count' ] = dataset.metadata.columns + params[ 'column_types' ] = dataset.metadata.column_types + params[ 'column_names' ] = dataset.metadata.column_names or getattr( dataset.datatype, 'column_names', None ) + return params + + def get_metadata_column_types( self, indeces=None ): + """ + Return the list of `column_types` for this dataset or `None` if unavailable. + :param indeces: the indeces for the columns of which to return the types. + Optional: defaults to None (return all types) + :type indeces: list of ints + """ + metadata_column_types = ( self.dataset.metadata.column_types + or getattr( self.dataset.datatype, 'column_types', None ) + or None ) + if not metadata_column_types: + return metadata_column_types + if indeces: + column_types = [] + for index in indeces: + column_type = metadata_column_types[ index ] if index < len( metadata_column_types ) else None + column_types.append( column_type ) + return column_types + return metadata_column_types + + def get_metadata_column_names( self, indeces=None ): + """ + Return the list of `column_names` for this dataset or `None` if unavailable. + :param indeces: the indeces for the columns of which to return the names. + Optional: defaults to None (return all names) + :type indeces: list of ints + """ + metadata_column_names = ( self.dataset.metadata.column_names + or getattr( self.dataset.datatype, 'column_names', None ) + or None ) + if not metadata_column_names: + return metadata_column_names + if indeces: + column_names = [] + for index in indeces: + column_type = metadata_column_names[ index ] if index < len( metadata_column_names ) else None + column_names.append( column_type ) + return column_names + return metadata_column_names + + #TODO: merge the next two + def get_indeces_by_column_names( self, list_of_column_names ): + """ + Return the list of column indeces when given a list of column_names. + :param list_of_column_names: the names of the columns of which to get indeces. + :type list_of_column_names: list of strs + :raises KeyError: if column_names are not found + :raises ValueError: if an entry in list_of_column_names is not in column_names + """ + metadata_column_names = ( self.dataset.metadata.column_names + or getattr( self.dataset.datatype, 'column_names', None ) + or None ) + if not metadata_column_names: + raise KeyError( 'No column_names found for ' + + 'datatype: %s, dataset: %s' %( str( self.dataset.datatype ), str( self.dataset ) ) ) + indeces = [] + for column_name in list_of_column_names: + indeces.append( metadata_column_names.index( column_name ) ) + return indeces + + def get_metadata_column_index_by_name( self, name ): + """ + Return the 1-base index of a sources column with the given `name`. + """ + # metadata columns are 1-based indeces + column = getattr( self.dataset.metadata, name ) + return ( column - 1 ) if isinstance( column, int ) else None + + def get_genomic_region_indeces( self, check=False ): + """ + Return a list of column indeces for 'chromCol', 'startCol', 'endCol' from + a source representing a genomic region. + + :param check: if True will raise a ValueError if any were not found. + :type check: bool + :raises ValueError: if check is `True` and one or more indeces were not found. + :returns: list of column indeces for the named columns. + """ + region_column_names = ( 'chromCol', 'startCol', 'endCol' ) + region_indeces = [ self.get_metadata_column_index_by_name( name ) for name in region_column_names ] + if check and not all( map( lambda i: i != None, indeces ) ): + raise ValueError( "Could not determine proper column indeces for chrom, start, end: %s" %( str( indeces ) ) ) + return region_indeces + + +class ConvertedDatasetDataProvider( DatasetDataProvider ): + """ + Class that uses the file contents of a dataset after conversion to a different + format. + """ + def __init__( self, dataset, **kwargs ): + raise NotImplementedError( 'Abstract class' ) + self.original_dataset = dataset + self.converted_dataset = self.convert_dataset( dataset, **kwargs ) + super( ConvertedDatasetDataProvider, self ).__init__( self.converted_dataset, **kwargs ) + #NOTE: now self.converted_dataset == self.dataset + + def convert_dataset( self, dataset, **kwargs ): + """ + Convert the given dataset in some way. + """ + return dataset + + +# ----------------------------------------------------------------------------- uses metadata for settings +class DatasetColumnarDataProvider( column.ColumnarDataProvider ): + """ + Data provider that uses a DatasetDataProvider as it's source and the + dataset's metadata to buuild settings for the ColumnarDataProvider it's + inherited from. + """ + def __init__( self, dataset, **kwargs ): + """ + All kwargs are inherited from ColumnarDataProvider. + .. seealso:: column.ColumnarDataProvider + + If no kwargs are given, this class will attempt to get those kwargs + from the dataset source's metadata. + If any kwarg is given, it will override and be used in place of + any metadata available. + """ + dataset_source = DatasetDataProvider( dataset ) + if not kwargs.get( 'column_types', None ): + indeces = kwargs.get( 'indeces', None ) + kwargs[ 'column_types' ] = dataset_source.get_metadata_column_types( indeces=indeces ) + super( DatasetColumnarDataProvider, self ).__init__( dataset_source, **kwargs ) + + +class DatasetMapDataProvider( column.MapDataProvider ): + """ + Data provider that uses a DatasetDataProvider as it's source and the + dataset's metadata to buuild settings for the MapDataProvider it's + inherited from. + """ + def __init__( self, dataset, **kwargs ): + """ + All kwargs are inherited from MapDataProvider. + .. seealso:: column.MapDataProvider + + If no kwargs are given, this class will attempt to get those kwargs + from the dataset source's metadata. + If any kwarg is given, it will override and be used in place of + any metadata available. + + The relationship between column_names and indeces is more complex: + +-----------------+-------------------------------+-----------------------+ + | | Indeces given | Indeces NOT given | + +=================+===============================+=======================+ + | Names given | pull indeces, rename w/ names | pull by name | + +=================+-------------------------------+-----------------------+ + | Names NOT given | pull indeces, name w/ meta | pull all, name w/meta | + +=================+-------------------------------+-----------------------+ + """ + dataset_source = DatasetDataProvider( dataset ) + + #TODO: getting too complicated - simplify at some lvl, somehow + # if no column_types given, get column_types from indeces (or all if indeces == None) + indeces = kwargs.get( 'indeces', None ) + column_names = kwargs.get( 'column_names', None ) + + #if indeces and column_names: + # # pull using indeces and re-name with given names - no need to alter (does as super would) + # pass + + if not indeces and column_names: + # pull columns by name + indeces = kwargs[ 'indeces' ] = dataset_source.get_indeces_by_column_names( column_names ) + + elif indeces and not column_names: + # pull using indeces, name with meta + column_names = kwargs[ 'column_names' ] = dataset_source.get_metadata_column_names( indeces=indeces ) + + elif not indeces and not column_names: + # pull all indeces and name using metadata + column_names = kwargs[ 'column_names' ] = dataset_source.get_metadata_column_names( indeces=indeces ) + + # if no column_types given, use metadata column_types + if not kwargs.get( 'column_types', None ): + kwargs[ 'column_types' ] = dataset_source.get_metadata_column_types( indeces=indeces ) + + super( DatasetMapDataProvider, self ).__init__( dataset_source, **kwargs ) + + +# ----------------------------------------------------------------------------- provides a bio-relevant datum +class GenomicRegionDataProvider( column.ColumnarDataProvider ): + """ + Data provider that parses chromosome, start, and end data from a file + using the datasets metadata settings. + + Is a ColumnarDataProvider that uses a DatasetDataProvider as it's source. + + If `named_columns` is true, will return dictionaries with the keys + 'chrom', 'start', 'end'. + """ + # dictionary keys when named_columns=True + COLUMN_NAMES = [ 'chrom', 'start', 'end' ] + + def __init__( self, dataset, chrom_column=None, start_column=None, end_column=None, named_columns=False, **kwargs ): + """ + :param dataset: the Galaxy dataset whose file will be the source + :type dataset: model.DatasetInstance + + :param chrom_column: optionally specify the chrom column index + :type chrom_column: int + :param start_column: optionally specify the start column index + :type start_column: int + :param end_column: optionally specify the end column index + :type end_column: int + + :param named_columns: optionally return dictionaries keying each column + with 'chrom', 'start', or 'end'. + Optional: defaults to False + :type named_columns: bool + """ + #TODO: allow passing in a string format e.g. "{chrom}:{start}-{end}" + dataset_source = DatasetDataProvider( dataset ) + + if chrom_column == None: + chrom_column = dataset_source.get_metadata_column_index_by_name( 'chromCol' ) + if start_column == None: + start_column = dataset_source.get_metadata_column_index_by_name( 'startCol' ) + if end_column == None: + end_column = dataset_source.get_metadata_column_index_by_name( 'endCol' ) + indeces = [ chrom_column, start_column, end_column ] + if not all( map( lambda i: i != None, indeces ) ): + raise ValueError( "Could not determine proper column indeces for" + + " chrom, start, end: %s" %( str( indeces ) ) ) + kwargs.update({ 'indeces' : indeces }) + + if not kwargs.get( 'column_types', None ): + kwargs.update({ 'column_types' : dataset_source.get_metadata_column_types( indeces=indeces ) }) + + self.named_columns = named_columns + if self.named_columns: + self.column_names = self.COLUMN_NAMES + + super( GenomicRegionDataProvider, self ).__init__( dataset_source, **kwargs ) + + def __iter__( self ): + parent_gen = super( GenomicRegionDataProvider, self ).__iter__() + for column_values in parent_gen: + if self.named_columns: + yield dict( zip( self.column_names, column_values ) ) + else: + yield column_values + + +#TODO: this optionally provides the same data as the above and makes GenomicRegionDataProvider redundant +# GenomicRegionDataProvider is a better name, tho +class IntervalDataProvider( column.ColumnarDataProvider ): + """ + Data provider that parses chromosome, start, and end data (as well as strand + and name if set in the metadata) using the dataset's metadata settings. + + If `named_columns` is true, will return dictionaries with the keys + 'chrom', 'start', 'end' (and 'strand' and 'name' if available). + """ + COLUMN_NAMES = [ 'chrom', 'start', 'end', 'strand', 'name' ] + + def __init__( self, dataset, chrom_column=None, start_column=None, end_column=None, + strand_column=None, name_column=None, named_columns=False, **kwargs ): + """ + :param dataset: the Galaxy dataset whose file will be the source + :type dataset: model.DatasetInstance + + :param named_columns: optionally return dictionaries keying each column + with 'chrom', 'start', 'end', 'strand', or 'name'. + Optional: defaults to False + :type named_columns: bool + """ + #TODO: allow passing in a string format e.g. "{chrom}:{start}-{end}" + dataset_source = DatasetDataProvider( dataset ) + + # get genomic indeces and add strand and name + if chrom_column == None: + chrom_column = dataset_source.get_metadata_column_index_by_name( 'chromCol' ) + if start_column == None: + start_column = dataset_source.get_metadata_column_index_by_name( 'startCol' ) + if end_column == None: + end_column = dataset_source.get_metadata_column_index_by_name( 'endCol' ) + if strand_column == None: + strand_column = dataset_source.get_metadata_column_index_by_name( 'strandCol' ) + if name_column == None: + name_column = dataset_source.get_metadata_column_index_by_name( 'nameCol' ) + indeces = [ chrom_column, start_column, end_column, strand_column, name_column ] + kwargs.update({ 'indeces' : indeces }) + + if not kwargs.get( 'column_types', None ): + kwargs.update({ 'column_types' : dataset_source.get_metadata_column_types( indeces=indeces ) }) + + self.named_columns = named_columns + if self.named_columns: + self.column_names = self.COLUMN_NAMES + + super( IntervalDataProvider, self ).__init__( dataset_source, **kwargs ) + + def __iter__( self ): + parent_gen = super( IntervalDataProvider, self ).__iter__() + for column_values in parent_gen: + if self.named_columns: + yield dict( zip( self.column_names, column_values ) ) + else: + yield column_values + + +#TODO: ideally with these next two - you'd allow pulling some region from the sequence +# WITHOUT reading the entire seq into memory - possibly apply some version of limit/offset +class FastaDataProvider( base.FilteredDataProvider ): + """ + Class that returns fasta format data in a list of maps of the form: + { + id: <fasta header id>, + sequence: <joined lines of nucleotide/amino data> + } + """ + def __init__( self, source, ids=None, **kwargs ): + """ + :param ids: optionally return only ids (and sequences) that are in this list. + Optional: defaults to None (provide all ids) + :type ids: list or None + """ + source = bx_seq.fasta.FastaReader( source ) + #TODO: validate is a fasta + super( FastaDataProvider, self ).__init__( source, **kwargs ) + self.ids = ids + # how to do ids? + + def __iter__( self ): + parent_gen = super( FastaDataProvider, self ).__iter__() + for fasta_record in parent_gen: + yield { + 'id' : fasta_record.name, + 'seq' : fasta_record.text + } + + +class TwoBitFastaDataProvider( DatasetDataProvider ): + """ + Class that returns fasta format data in a list of maps of the form: + { + id: <fasta header id>, + sequence: <joined lines of nucleotide/amino data> + } + """ + def __init__( self, source, ids=None, **kwargs ): + """ + :param ids: optionally return only ids (and sequences) that are in this list. + Optional: defaults to None (provide all ids) + :type ids: list or None + """ + source = bx_seq.twobit.TwoBitFile( source ) + #TODO: validate is a 2bit + super( FastaDataProvider, self ).__init__( source, **kwargs ) + # could do in order provided with twobit + self.ids = ids or self.source.keys() + + def __iter__( self ): + for id_ in self.ids: + yield { + 'id' : id_, + 'seq' : self.source[ name ] + } + + +#TODO: +class WiggleDataProvider( base.LimitedOffsetDataProvider ): + """ + Class that returns chrom, pos, data from a wiggle source. + """ + COLUMN_NAMES = [ 'chrom', 'pos', 'value' ] + + def __init__( self, source, named_columns=False, column_names=None, **kwargs ): + """ + :param named_columns: optionally return dictionaries keying each column + with 'chrom', 'start', 'end', 'strand', or 'name'. + Optional: defaults to False + :type named_columns: bool + + :param column_names: an ordered list of strings that will be used as the keys + for each column in the returned dictionaries. + The number of key, value pairs each returned dictionary has will + be as short as the number of column names provided. + :type column_names: + """ + #TODO: validate is a wig + # still good to maintain a ref to the raw source bc Reader won't + self.raw_source = source + self.parser = bx_wig.Reader( source ) + super( WiggleDataProvider, self ).__init__( self.parser, **kwargs ) + + self.named_columns = named_columns + self.column_names = column_names or self.COLUMN_NAMES + + def __iter__( self ): + parent_gen = super( WiggleDataProvider, self ).__iter__() + for three_tuple in parent_gen: + if self.named_columns: + yield dict( zip( self.column_names, three_tuple ) ) + else: + # list is not strictly necessary - but consistent + yield list( three_tuple ) + + +class BigWigDataProvider( base.LimitedOffsetDataProvider ): + """ + Class that returns chrom, pos, data from a wiggle source. + """ + COLUMN_NAMES = [ 'chrom', 'pos', 'value' ] + + def __init__( self, source, chrom, start, end, named_columns=False, column_names=None, **kwargs ): + """ + :param chrom: which chromosome within the bigbed file to extract data for + :type chrom: str + :param start: the start of the region from which to extract data + :type start: int + :param end: the end of the region from which to extract data + :type end: int + + :param named_columns: optionally return dictionaries keying each column + with 'chrom', 'start', 'end', 'strand', or 'name'. + Optional: defaults to False + :type named_columns: bool + + :param column_names: an ordered list of strings that will be used as the keys + for each column in the returned dictionaries. + The number of key, value pairs each returned dictionary has will + be as short as the number of column names provided. + :type column_names: + """ + raise NotImplementedError( 'Work in progress' ) + #TODO: validate is a wig + # still good to maintain a ref to the raw source bc Reader won't + self.raw_source = source + self.parser = bx_bbi.bigwig_file.BigWigFile( source ) + super( BigWigDataProvider, self ).__init__( self.parser, **kwargs ) + + self.named_columns = named_columns + self.column_names = column_names or self.COLUMN_NAMES + + def __iter__( self ): + parent_gen = super( BigWigDataProvider, self ).__iter__() + for three_tuple in parent_gen: + if self.named_columns: + yield dict( zip( self.column_names, three_tuple ) ) + else: + # list is not strictly necessary - but consistent + yield list( three_tuple ) + + +# ----------------------------------------------------------------------------- binary, external conversion or tool +class DatasetSubprocessDataProvider( external.SubprocessDataProvider ): + """ + Create a source from running a subprocess on a dataset's file. + + Uses a subprocess as it's source and has a dataset (gen. as an input file + for the process). + """ + #TODO: below should be a subclass of this and not RegexSubprocess + def __init__( self, dataset, *args, **kwargs ): + """ + :param args: the list of strings used to build commands. + :type args: variadic function args + """ + raise NotImplementedError( 'Abstract class' ) + super( DatasetSubprocessDataProvider, self ).__init__( *args, **kwargs ) + self.dataset = dataset + + +class SamtoolsDataProvider( line.RegexLineDataProvider ): + """ + Data provider that uses samtools on a Sam or Bam file as it's source. + + This can be piped through other providers (column, map, genome region, etc.). + + .. note:: that only the samtools 'view' command is currently implemented. + """ + FLAGS_WO_ARGS = 'bhHSu1xXcB' + FLAGS_W_ARGS = 'fFqlrs' + VALID_FLAGS = FLAGS_WO_ARGS + FLAGS_W_ARGS + + def __init__( self, dataset, options_string='', options_dict=None, regions=None, **kwargs ): + """ + :param options_string: samtools options in string form (flags separated + by spaces) + Optional: defaults to '' + :type options_string: str + :param options_dict: dictionary of samtools options + Optional: defaults to None + :type options_dict: dict or None + :param regions: list of samtools regions strings + Optional: defaults to None + :type regions: list of str or None + """ + #TODO: into validate_source + + #TODO: have to import these here due to circular ref in model/datatypes + import galaxy.datatypes.binary + import galaxy.datatypes.tabular + if( not( isinstance( dataset.datatype, galaxy.datatypes.tabular.Sam ) + or isinstance( dataset.datatype, galaxy.datatypes.binary.Bam ) ) ): + raise exceptions.InvalidDataProviderSource( + 'dataset must be a Sam or Bam datatype: %s' %( str( dataset.datatype ) ) ) + self.dataset = dataset + + options_dict = options_dict or {} + # ensure regions are strings + regions = [ str( r ) for r in regions ] if regions else [] + + #TODO: view only for now + #TODO: not properly using overriding super's validate_opts, command here + subcommand = 'view' + #TODO:?? do we need a path to samtools? + subproc_args = self.build_command_list( subcommand, options_string, options_dict, regions ) +#TODO: the composition/inheritance here doesn't make a lot sense + subproc_provider = external.SubprocessDataProvider( *subproc_args ) + super( SamtoolsDataProvider, self ).__init__( subproc_provider, **kwargs ) + + def build_command_list( self, subcommand, options_string, options_dict, regions ): + """ + Convert all init args to list form. + """ + command = [ 'samtools', subcommand ] + # add options and switches, input file, regions list (if any) + command.extend( self.to_options_list( options_string, options_dict ) ) + command.append( self.dataset.file_name ) + command.extend( regions ) + return command + + def to_options_list( self, options_string, options_dict ): + """ + Convert both options_string and options_dict to list form + while filtering out non-'valid' options. + """ + opt_list = [] + + # strip out any user supplied bash switch formating -> string of option chars + # then compress to single option string of unique, VALID flags with prefixed bash switch char '-' + options_string = options_string.strip( '- ' ) + validated_flag_list = set([ flag for flag in options_string if flag in self.FLAGS_WO_ARGS ]) + + # if sam add -S + if( ( isinstance( self.dataset.datatype, galaxy.datatypes.tabular.Sam ) + and ( 'S' not in validated_flag_list ) ) ): + validated_flag_list.append( 'S' ) + + if validated_flag_list: + opt_list.append( '-' + ''.join( validated_flag_list ) ) + + for flag, arg in options_dict.items(): + if flag in self.FLAGS_W_ARGS: + opt_list.extend([ '-' + flag, str( arg ) ]) + + return opt_list + + @classmethod + def extract_options_from_dict( cls, dictionary ): + """ + Separrates valid samtools key/value pair options from a dictionary and + returns both as a 2-tuple. + """ + # handy for extracting options from kwargs - but otherwise... + #TODO: could be abstracted to util.extract( dict, valid_keys_list ) + options_dict = {} + new_kwargs = {} + for key, value in dictionary.items(): + if key in cls.FLAGS_W_ARGS: + options_dict[ key ] = value + else: + new_kwargs[ key ] = value + return options_dict, new_kwargs + + +class BcftoolsDataProvider( line.RegexLineDataProvider ): + """ + Data provider that uses an bcftools on a bcf (or vcf?) file as it's source. + + This can be piped through other providers (column, map, genome region, etc.). + """ + def __init__( self, dataset, **kwargs ): + #TODO: as samtools + raise NotImplementedError() + super( BCFDataProvider, self ).__init__( dataset, **kwargs ) + + +class BGzipTabixDataProvider( base.DataProvider ): + """ + Data provider that uses an g(un)zip on a file as it's source. + + This can be piped through other providers (column, map, genome region, etc.). + """ + def __init__( self, dataset, **kwargs ): + #TODO: as samtools - need more info on output format + raise NotImplementedError() + super( BGzipTabixDataProvider, self ).__init__( dataset, **kwargs ) diff -r 159fadd1ee0c379992d1ebd625f4b8e7b25cfb81 -r 8df7f1c474896ed75557efc6f3359d09c3772c26 lib/galaxy/datatypes/dataproviders/decorators.py --- /dev/null +++ b/lib/galaxy/datatypes/dataproviders/decorators.py @@ -0,0 +1,107 @@ +""" +DataProvider related decorators. +""" + +# I'd like to decorate the factory methods that give data_providers by the name they can be accessed from. e.g.: +#@provides( 'id_seq' ) # where 'id_seq' is some 'data_format' string/alias +#def get_id_seq_provider( dataset, **settings ): + +# then in some central dispatch (maybe data.Data), have it look up the proper method by the data_format string + +# also it would be good to have this decorator maintain a list of available providers (for a datatype) + +# i don't particularly want to cut up method names ( get_([\w_]*)_provider ) +#!/usr/bin/env python + +# adapted from: http://stackoverflow.com +# /questions/14095616/python-can-i-programmatically-decorate-class-methods-from-a-class-instance + +from functools import wraps +#from types import MethodType +import copy + +import logging +log = logging.getLogger( __name__ ) + + +# ----------------------------------------------------------------------------- +_DATAPROVIDER_CLASS_MAP_KEY = 'dataproviders' +_DATAPROVIDER_METHOD_NAME_KEY = '_dataprovider_name' + +# ----------------------------------------------------------------------------- +def has_dataproviders( cls ): + """ + Wraps a class (generally a Datatype), finds methods within that have been + decorated with `@dataprovider` and adds them, by their name, to a map + in the class. + + This allows a class to maintain a name -> method map, effectively + 'registering' dataprovider factory methods. + + .. example:: + @has_dataproviders + class MyDtype( data.Data ): + + @dataprovider_factory( 'bler' ) + def provide_some_bler( self, dataset, **settings ): + '''blerblerbler''' + dataset_source = providers.DatasetDataProvider( dataset ) + # ... chain other, intermidiate providers here + return providers.BlerDataProvider( dataset_source, **settings ) + + # use the base method in data.Data + provider = dataset.datatype.dataprovider( dataset, 'bler', + my_setting='blah', ... ) + # OR directly from the map + provider = dataset.datatype.dataproviders[ 'bler' ]( dataset, + my_setting='blah', ... ) + """ + #log.debug( 'has_dataproviders:', cls ) + # init the class dataproviders map if necc. + if not hasattr( cls, _DATAPROVIDER_CLASS_MAP_KEY ): + setattr( cls, _DATAPROVIDER_CLASS_MAP_KEY, {} ) + else: + # need to deepcopy or subclasses will modify super.dataproviders as well + existing_dataproviders = getattr( cls, _DATAPROVIDER_CLASS_MAP_KEY ) + copied_dataproviders = copy.deepcopy( existing_dataproviders ) + setattr( cls, _DATAPROVIDER_CLASS_MAP_KEY, copied_dataproviders ) + + dataproviders = getattr( cls, _DATAPROVIDER_CLASS_MAP_KEY ) + + # scan for methods with dataprovider names and add them to the map + # note: this has a 'cascading' effect + # where it's possible to override a super's provider with a sub's + for attr_key, attr_value in cls.__dict__.iteritems(): + #log.debug( '\t key:', attr_key ) + # can't use isinstance( attr_value, MethodType ) bc of wrapping + if( ( callable( attr_value ) ) + and ( not attr_key.startswith( "__" ) ) + and ( getattr( attr_value, _DATAPROVIDER_METHOD_NAME_KEY, None ) ) ): + #log.debug( '\t\t is a dataprovider', attr_key ) + name = getattr( attr_value, _DATAPROVIDER_METHOD_NAME_KEY ) + dataproviders[ name ] = attr_value + + #log.debug( 'dataproviders:' ) + #for name, fn in cls.dataproviders.items(): + # log.debug( '\t ', name, '->', fn.__name__, fn ) + # log.debug( '\t\t ', fn.__doc__ ) + return cls + +def dataprovider_factory( name ): + """ + Wraps a class method and marks it as a dataprovider factory. + + :param name: what name/key to register the factory under in `cls.dataproviders` + :param type: any hashable var + """ + #log.debug( 'dataprovider:', name ) + def named_dataprovider_factory( func ): + #log.debug( 'named_dataprovider_factory:', name, '->', func.__name__ ) + setattr( func, _DATAPROVIDER_METHOD_NAME_KEY, name ) + #log.debug( '\t setting:', getattr( func, _DATAPROVIDER_METHOD_NAME_KEY ) ) + @wraps( func ) + def wrapped_dataprovider_factory( self, *args, **kwargs ): + #log.debug( 'wrapped_dataprovider_factory', name, self, args, kwargs ) + return func( self, *args, **kwargs ) + return wrapped_dataprovider_factory + return named_dataprovider_factory diff -r 159fadd1ee0c379992d1ebd625f4b8e7b25cfb81 -r 8df7f1c474896ed75557efc6f3359d09c3772c26 lib/galaxy/datatypes/dataproviders/exceptions.py --- /dev/null +++ b/lib/galaxy/datatypes/dataproviders/exceptions.py @@ -0,0 +1,33 @@ +""" +DataProvider related exceptions. +""" + +class InvalidDataProviderSource( TypeError ): + """ + Raised when a unusable source is passed to a provider. + """ + def __init__( self, source=None, msg='' ): + msg = msg or 'Invalid source for provider: %s' %( source ) + super( InvalidDataProviderSource, self ).__init__( msg ) + + +class NoProviderAvailable( TypeError ): + """ + Raised when no provider is found for the given `format_requested`. + + :param factory_source: the item that the provider was requested from + :param format_requested: the format_requested (a hashable key to access + `factory_source.datatypes` with) + + Both params are attached to this class and accessible to the try-catch + receiver. + + Meant to be used within a class that builds dataproviders (e.g. a Datatype) + """ + def __init__( self, factory_source, format_requested=None, msg='' ): + self.factory_source = factory_source + self.format_requested = format_requested + msg = msg or 'No provider available in factory_source "%s" for format requested' %( str( factory_source ) ) + if self.format_requested: + msg += ': "%s"' %( self.format_requested ) + super( NoProviderAvailable, self ).__init__( msg ) diff -r 159fadd1ee0c379992d1ebd625f4b8e7b25cfb81 -r 8df7f1c474896ed75557efc6f3359d09c3772c26 lib/galaxy/datatypes/dataproviders/external.py --- /dev/null +++ b/lib/galaxy/datatypes/dataproviders/external.py @@ -0,0 +1,165 @@ +""" +Data providers that iterate over a source that is not in memory +or not in a file. +""" + +import subprocess +import urllib, urllib2 +import gzip + +import base +import line + +_TODO = """ +YAGNI: ftp, image, cryptos, sockets +job queue +admin: admin server log rgx/stats, ps aux +""" + +import logging +log = logging.getLogger( __name__ ) + + +# ----------------------------------------------------------------------------- server subprocess / external prog +class SubprocessDataProvider( base.DataProvider ): + """ + Data provider that uses the output from an intermediate program and + subprocess as it's data source. + """ + #TODO: need better ways of checking returncode, stderr for errors and raising + def __init__( self, *args, **kwargs ): + """ + :param args: the list of strings used to build commands. + :type args: variadic function args + """ + self.exit_code = None + command_list = args + self.popen = self.subprocess( *command_list, **kwargs ) + #TODO:?? not communicate()? + super( SubprocessDataProvider, self ).__init__( self.popen.stdout ) + self.exit_code = self.popen.poll() + + #NOTE: there's little protection here v. sending a ';' and a dangerous command here + # but...we're all adults here, right? ...RIGHT?! + def subprocess( self, *command_list, **kwargs ): + """ + :param args: the list of strings used as commands. + :type args: variadic function args + """ + try: + # how expensive is this? + popen = subprocess.Popen( command_list, stderr=subprocess.PIPE, stdout=subprocess.PIPE ) + log.info( 'opened subrocess (%s), PID: %s' %( str( command_list ), str( popen.pid ) ) ) + #log.debug( 'stderr:\n%s\n' %( popen.stderr.read() ) ) + + except OSError, os_err: + command_str = ' '.join( self.command ) + raise OSError( ' '.join([ str( os_err ), ':', command_str ]) ) + + return popen + + def __exit__( self, *args ): + # poll the subrocess for an exit code + self.exit_code = self.popen.poll() + log.info( '%s.__exit__, exit_code: %s' %( str( self ), str( self.exit_code ) ) ) + return super( SubprocessDataProvider, self ).__exit__( *args ) + + def __str__( self ): + # provide the pid and current return code + source_str = '' + if hasattr( self, 'popen' ): + source_str = '%s:%s' %( str( self.popen.pid ), str( self.popen.poll() ) ) + return '%s(%s)' %( self.__class__.__name__, str( source_str ) ) + + +class RegexSubprocessDataProvider( line.RegexLineDataProvider ): + """ + RegexLineDataProvider that uses a SubprocessDataProvider as it's data source. + """ + # this is a conv. class and not really all that necc... + def __init__( self, *args, **kwargs ): + # using subprocess as proxy data source in filtered line prov. + subproc_provider = SubprocessDataProvider( *args ) + super( RegexSubprocessDataProvider, self ).__init__( subproc_provider, **kwargs ) + + +# ----------------------------------------------------------------------------- other apis +class URLDataProvider( base.DataProvider ): + """ + Data provider that uses the contents of a URL for it's data source. + + This can be piped through other providers (column, map, genome region, etc.). + """ + VALID_METHODS = ( 'GET', 'POST' ) + + def __init__( self, url, method='GET', data=None, **kwargs ): + """ + :param url: the base URL to open. + :param method: the HTTP method to use. + Optional: defaults to 'GET' + :param data: any data to pass (either in query for 'GET' + or as post data with 'POST') + :type data: dict + """ + self.url = url + self.method = method + + self.data = data or {} + encoded_data = urllib.urlencode( self.data ) + + if method == 'GET': + self.url += '?%s' %( encoded_data ) + opened = urllib2.urlopen( url ) + elif method == 'POST': + opened = urllib2.urlopen( url, encoded_data ) + else: + raise ValueError( 'Not a valid method: %s' %( method ) ) + + super( URLDataProvider, self ).__init__( opened, **kwargs ) + #NOTE: the request object is now accessible as self.source + + def __enter__( self ): + pass + + def __exit__( self, *args ): + self.source.close() + + +# ----------------------------------------------------------------------------- generic compression +class GzipDataProvider( base.DataProvider ): + """ + Data provider that uses g(un)zip on a file as it's source. + + This can be piped through other providers (column, map, genome region, etc.). + """ + def __init__( self, source, **kwargs ): + unzipped = gzip.GzipFile( source, 'rb' ) + super( GzipDataProvider, self ).__init__( unzipped, **kwargs ) + #NOTE: the GzipFile is now accessible in self.source + + +# ----------------------------------------------------------------------------- intermediate tempfile +class TempfileDataProvider( base.DataProvider ): + """ + Writes the data from the given source to a temp file, allowing + it to be used as a source where a file_name is needed (e.g. as a parameter + to a command line tool: samtools view -t <this_provider.source.file_name>) + """ + def __init__( self, source, **kwargs ): + #TODO: + raise NotImplementedError() + # write the file here + self.create_file + super( TempfileDataProvider, self ).__init__( self.tmp_file, **kwargs ) + + def create_file( self ): + self.tmp_file = tempfile.NamedTemporaryFile() + return self.tmp_file + + def write_to_file( self ): + parent_gen = super( TempfileDataProvider, self ).__iter__() + #??? + with open( self.tmp_file, 'w' ) as open_file: + for datum in parent_gen: + open_file.write( datum + '\n' ) + This diff is so big that we needed to truncate the remainder. https://bitbucket.org/galaxy/galaxy-central/commits/701a94aadeb1/ Changeset: 701a94aadeb1 User: dannon Date: 2013-06-18 03:43:15 Summary: Remove extra debug from job remapping code. Affected #: 1 file diff -r 8df7f1c474896ed75557efc6f3359d09c3772c26 -r 701a94aadeb11df9bd0c70dd63efa1b5492d5524 lib/galaxy/tools/__init__.py --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -1940,7 +1940,6 @@ rerun_remap_job_id = None if state.rerun_remap_job_id is not None: rerun_remap_job_id = trans.app.security.decode_id(state.rerun_remap_job_id) - log.debug('######### %s/%s' % (state.rerun_remap_job_id, rerun_remap_job_id)) _, out_data = self.execute( trans, incoming=params, history=history, rerun_remap_job_id=rerun_remap_job_id ) except httpexceptions.HTTPFound, e: #if it's a paste redirect exception, pass it up the stack https://bitbucket.org/galaxy/galaxy-central/commits/df0a48ec8514/ Changeset: df0a48ec8514 User: dannon Date: 2013-06-18 04:03:57 Summary: Remove unused assignment in tool_runner Affected #: 1 file diff -r 701a94aadeb11df9bd0c70dd63efa1b5492d5524 -r df0a48ec8514db4e8fcf4d31006c2dc318636b39 lib/galaxy/webapps/galaxy/controllers/tool_runner.py --- a/lib/galaxy/webapps/galaxy/controllers/tool_runner.py +++ b/lib/galaxy/webapps/galaxy/controllers/tool_runner.py @@ -295,7 +295,7 @@ tool_state.decode( encoded_state, tool, trans.app ) else: tool_state = tool.new_state( trans ) - errors = tool.update_state( trans, tool.inputs, tool_state.inputs, kwd, update_only = True ) + tool.update_state( trans, tool.inputs, tool_state.inputs, kwd, update_only = True ) datasets = [] dataset_upload_inputs = [] for input_name, input in tool.inputs.iteritems(): https://bitbucket.org/galaxy/galaxy-central/commits/88f3a639d3c1/ Changeset: 88f3a639d3c1 User: dannon Date: 2013-06-18 04:04:59 Summary: Fix rerun_remap checkbox to work only when you want it to, instead of all the time. Affected #: 1 file diff -r df0a48ec8514db4e8fcf4d31006c2dc318636b39 -r 88f3a639d3c198b5d5e0f99570f243756cae9a03 lib/galaxy/tools/__init__.py --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -1938,8 +1938,8 @@ elif state.page == self.last_page: try: rerun_remap_job_id = None - if state.rerun_remap_job_id is not None: - rerun_remap_job_id = trans.app.security.decode_id(state.rerun_remap_job_id) + if 'rerun_remap_job_id' in incoming: + rerun_remap_job_id = trans.app.security.decode_id(incoming['rerun_remap_job_id']) _, out_data = self.execute( trans, incoming=params, history=history, rerun_remap_job_id=rerun_remap_job_id ) except httpexceptions.HTTPFound, e: #if it's a paste redirect exception, pass it up the stack https://bitbucket.org/galaxy/galaxy-central/commits/d2d4b6ac2139/ Changeset: d2d4b6ac2139 User: dannon Date: 2013-06-18 04:06:19 Summary: Relocate flush in tool execute. Now works when not remapping. Affected #: 1 file diff -r 88f3a639d3c198b5d5e0f99570f243756cae9a03 -r d2d4b6ac2139f1623301990462a5e63b2952dcc1 lib/galaxy/tools/actions/__init__.py --- a/lib/galaxy/tools/actions/__init__.py +++ b/lib/galaxy/tools/actions/__init__.py @@ -442,9 +442,9 @@ trans.sa_session.add(jtid) jtod.dataset.visible = False trans.sa_session.add(jtod) - trans.sa_session.flush() except Exception, e: log.exception('Cannot remap rerun dependencies.') + trans.sa_session.flush() # Some tools are not really executable, but jobs are still created for them ( for record keeping ). # Examples include tools that redirect to other applications ( epigraph ). These special tools must # include something that can be retrieved from the params ( e.g., REDIRECT_URL ) to keep the job Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.