commit/galaxy-central: natefoo: Library datasets can now be job inputs - needed to be able to use the deferred job runner and transfer manager with set_metadata_externally = True.
1 new changeset in galaxy-central: http://bitbucket.org/galaxy/galaxy-central/changeset/e04154ed4639/ changeset: e04154ed4639 user: natefoo date: 2011-06-17 22:06:17 summary: Library datasets can now be job inputs - needed to be able to use the deferred job runner and transfer manager with set_metadata_externally = True. affected #: 7 files (2.6 KB) --- a/lib/galaxy/jobs/__init__.py Fri Oct 22 14:30:58 2010 +0100 +++ b/lib/galaxy/jobs/__init__.py Fri Jun 17 16:06:17 2011 -0400 @@ -229,7 +229,7 @@ return JOB_DELETED elif job.state == model.Job.states.ERROR: return JOB_ADMIN_DELETED - for dataset_assoc in job.input_datasets: + for dataset_assoc in job.input_datasets + job.input_library_datasets: idata = dataset_assoc.dataset if not idata: continue @@ -330,6 +330,7 @@ # Restore input / output data lists inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] ) out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] ) + inp_data.update( [ ( da.name, da.dataset ) for da in job.input_library_datasets ] ) out_data.update( [ ( da.name, da.dataset ) for da in job.output_library_datasets ] ) # Set up output dataset association for export history jobs. Because job @@ -614,6 +615,7 @@ # custom post process setup inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] ) out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] ) + inp_data.update( [ ( da.name, da.dataset ) for da in job.input_library_datasets ] ) out_data.update( [ ( da.name, da.dataset ) for da in job.output_library_datasets ] ) param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] ) # why not re-use self.param_dict here? ##dunno...probably should, this causes tools.parameters.basic.UnvalidatedValue to be used in following methods instead of validated and transformed values during i.e. running workflows param_dict = self.tool.params_from_strings( param_dict, self.app ) @@ -665,7 +667,7 @@ def get_input_fnames( self ): job = self.get_job() filenames = [] - for da in job.input_datasets: #da is JobToInputDatasetAssociation object + for da in job.input_datasets + job.input_library_datasets: #da is JobToInputDatasetAssociation object if da.dataset: filenames.append( da.dataset.file_name ) #we will need to stage in metadata file names also @@ -861,9 +863,10 @@ self.tool.handle_unvalidated_param_values( incoming, self.app ) # Restore input / output data lists inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] ) + out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] ) + inp_data.update( [ ( da.name, da.dataset ) for da in job.input_library_datasets ] ) + out_data.update( [ ( da.name, da.dataset ) for da in job.output_library_datasets ] ) # DBTODO New method for generating command line for a task? - out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] ) - out_data.update( [ ( da.name, da.dataset ) for da in job.output_library_datasets ] ) # These can be passed on the command line if wanted as $userId $userEmail if job.history and job.history.user: # check for anonymous user! userId = '%d' % job.history.user.id --- a/lib/galaxy/jobs/deferred/__init__.py Fri Oct 22 14:30:58 2010 +0100 +++ b/lib/galaxy/jobs/deferred/__init__.py Fri Jun 17 16:06:17 2011 -0400 @@ -98,14 +98,14 @@ job_state = self.plugins[job.plugin].check_job( job ) except Exception, e: self.__fail_job( job ) - log.error( 'Set deferred job %s to error because of an exception in check_job(): %s' % ( job.id, str( e ) ) ) + log.exception( 'Set deferred job %s to error because of an exception in check_job(): %s' % ( job.id, str( e ) ) ) continue if job_state == self.job_states.READY: try: self.plugins[job.plugin].run_job( job ) except Exception, e: self.__fail_job( job ) - log.error( 'Set deferred job %s to error because of an exception in run_job(): %s' % ( job.id, str( e ) ) ) + log.exception( 'Set deferred job %s to error because of an exception in run_job(): %s' % ( job.id, str( e ) ) ) continue elif job_state == self.job_states.INVALID: self.__fail_job( job ) @@ -160,8 +160,14 @@ self.app = app self.sa_session = app.model.context.current self.dummy = Dummy() - self.history = history - self.user = user + if not history: + self.history = Dummy() + else: + self.history = history + if not user: + self.user = Dummy() + else: + self.user = user self.model = app.model def get_galaxy_session( self ): return self.dummy --- a/lib/galaxy/jobs/deferred/data_transfer.py Fri Oct 22 14:30:58 2010 +0100 +++ b/lib/galaxy/jobs/deferred/data_transfer.py Fri Jun 17 16:06:17 2011 -0400 @@ -93,12 +93,18 @@ # In this case, job.params will be a dictionary that contains a key named 'result'. The value # of the result key is a dictionary that looks something like: # {'sample_dataset_id': '8', 'status': 'Not started', 'protocol': 'scp', 'name': '3.bed', - # 'file_path': '/tmp/library/3.bed', 'host': '127.0.0.1', 'sample_id': 8, 'external_service_id': 2, - # 'password': 'galaxy', 'user_name': 'gvk', 'error_msg': '', 'size': '8.0K'} - result_dict = job.params[ 'result' ] + # 'file_path': '/data/library/3.bed', 'host': '127.0.0.1', 'sample_id': 8, 'external_service_id': 2, + # 'local_path': '/tmp/kjl2Ss4', 'password': 'galaxy', 'user_name': 'gvk', 'error_msg': '', 'size': '8.0K'} + try: + tj = self.sa_session.query( self.app.model.TransferJob ).get( int( job.params['transfer_job_id'] ) ) + result_dict = tj.params + result_dict['local_path'] = tj.path + except Exception, e: + log.error( "Updated transfer result unavailable, using old result. Error was: %s" % str( e ) ) + result_dict = job.params[ 'result' ] library_dataset_name = result_dict[ 'name' ] # Determine the data format (see the relevant TODO item in the manual_data_transfer plugin).. - extension = sniff.guess_ext( result_dict[ 'file_path' ], sniff_order=self.app.datatypes_registry.sniff_order ) + extension = sniff.guess_ext( result_dict[ 'local_path' ], sniff_order=self.app.datatypes_registry.sniff_order ) self._update_sample_dataset_status( protocol=job.params[ 'protocol' ], sample_id=int( job.params[ 'sample_id' ] ), result_dict=result_dict, @@ -132,7 +138,9 @@ setattr( ldda.metadata, name, spec.unwrap( spec.get( 'default' ) ) ) if self.app.config.set_metadata_externally: self.app.datatypes_registry.set_external_metadata_tool.tool_action.execute( self.app.datatypes_registry.set_external_metadata_tool, - FakeTrans( self.app ), + FakeTrans( self.app, + history=sample.history, + user=sample.request.user ), incoming = { 'input1':ldda } ) else: ldda.set_meta() --- a/lib/galaxy/jobs/transfer_manager.py Fri Oct 22 14:30:58 2010 +0100 +++ b/lib/galaxy/jobs/transfer_manager.py Fri Jun 17 16:06:17 2011 -0400 @@ -106,6 +106,7 @@ error = dict( code=256, message='Error connecting to transfer daemon', data=str( e ) ) rval.append( dict( transfer_job_id=tj.id, state=tj.state, error=error ) ) else: + self.sa_session.refresh( tj ) rval.append( dict( transfer_job_id=tj.id, state=tj.state ) ) for tj_state in rval: if tj_state['state'] in self.app.model.TransferJob.terminal_states: --- a/lib/galaxy/model/__init__.py Fri Oct 22 14:30:58 2010 +0100 +++ b/lib/galaxy/model/__init__.py Fri Jun 17 16:06:17 2011 -0400 @@ -95,6 +95,7 @@ self.parameters = [] self.input_datasets = [] self.output_datasets = [] + self.input_library_datasets = [] self.output_library_datasets = [] self.state = Job.states.NEW self.info = None @@ -109,6 +110,8 @@ self.input_datasets.append( JobToInputDatasetAssociation( name, dataset ) ) def add_output_dataset( self, name, dataset ): self.output_datasets.append( JobToOutputDatasetAssociation( name, dataset ) ) + def add_input_library_dataset( self, name, dataset ): + self.input_library_datasets.append( JobToInputLibraryDatasetAssociation( name, dataset ) ) def add_output_library_dataset( self, name, dataset ): self.output_library_datasets.append( JobToOutputLibraryDatasetAssociation( name, dataset ) ) def add_post_job_action(self, pja): @@ -215,6 +218,11 @@ self.name = name self.dataset = dataset +class JobToInputLibraryDatasetAssociation( object ): + def __init__( self, name, dataset ): + self.name = name + self.dataset = dataset + class JobToOutputLibraryDatasetAssociation( object ): def __init__( self, name, dataset ): self.name = name --- a/lib/galaxy/model/mapping.py Fri Oct 22 14:30:58 2010 +0100 +++ b/lib/galaxy/model/mapping.py Fri Jun 17 16:06:17 2011 -0400 @@ -370,6 +370,12 @@ Column( "dataset_id", Integer, ForeignKey( "history_dataset_association.id" ), index=True ), Column( "name", String(255) ) ) +JobToInputLibraryDatasetAssociation.table = Table( "job_to_input_library_dataset", metadata, + Column( "id", Integer, primary_key=True ), + Column( "job_id", Integer, ForeignKey( "job.id" ), index=True ), + Column( "ldda_id", Integer, ForeignKey( "library_dataset_dataset_association.id" ), index=True ), + Column( "name", String(255) ) ) + JobToOutputLibraryDatasetAssociation.table = Table( "job_to_output_library_dataset", metadata, Column( "id", Integer, primary_key=True ), Column( "job_id", Integer, ForeignKey( "job.id" ), index=True ), @@ -1376,6 +1382,9 @@ assign_mapper( context, JobToOutputDatasetAssociation, JobToOutputDatasetAssociation.table, properties=dict( job=relation( Job ), dataset=relation( HistoryDatasetAssociation, lazy=False ) ) ) +assign_mapper( context, JobToInputLibraryDatasetAssociation, JobToInputLibraryDatasetAssociation.table, + properties=dict( job=relation( Job ), dataset=relation( LibraryDatasetDatasetAssociation, lazy=False ) ) ) + assign_mapper( context, JobToOutputLibraryDatasetAssociation, JobToOutputLibraryDatasetAssociation.table, properties=dict( job=relation( Job ), dataset=relation( LibraryDatasetDatasetAssociation, lazy=False ) ) ) @@ -1410,6 +1419,7 @@ input_datasets=relation( JobToInputDatasetAssociation ), output_datasets=relation( JobToOutputDatasetAssociation ), post_job_actions=relation( PostJobActionAssociation, lazy=False ), + input_library_datasets=relation( JobToInputLibraryDatasetAssociation ), output_library_datasets=relation( JobToOutputLibraryDatasetAssociation ), external_output_metadata = relation( JobExternalOutputMetadata, lazy = False ) ) ) --- a/lib/galaxy/tools/actions/metadata.py Fri Oct 22 14:30:58 2010 +0100 +++ b/lib/galaxy/tools/actions/metadata.py Fri Jun 17 16:06:17 2011 -0400 @@ -13,6 +13,12 @@ if isinstance( value, trans.app.model.HistoryDatasetAssociation ): dataset = value dataset_name = name + type = 'hda' + break + elif isinstance( value, trans.app.model.LibraryDatasetDatasetAssociation ): + dataset = value + dataset_name = name + type = 'ldda' break else: raise Exception( 'The dataset to set metadata on could not be determined.' ) @@ -22,6 +28,8 @@ job.session_id = trans.get_galaxy_session().id job.history_id = trans.history.id job.tool_id = tool.id + if trans.user: + job.user_id = trans.user.id start_job_state = job.state #should be job.states.NEW try: # For backward compatibility, some tools may not have versions yet. @@ -50,7 +58,10 @@ for name, value in tool.params_to_strings( incoming, trans.app ).iteritems(): job.add_parameter( name, value ) #add the dataset to job_to_input_dataset table - job.add_input_dataset( dataset_name, dataset ) + if type == 'hda': + job.add_input_dataset( dataset_name, dataset ) + elif type == 'ldda': + job.add_input_library_dataset( dataset_name, dataset ) #Need a special state here to show that metadata is being set and also allow the job to run # i.e. if state was set to 'running' the set metadata job would never run, as it would wait for input (the dataset to set metadata on) to be in a ready state dataset._state = dataset.states.SETTING_METADATA Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
Bitbucket