1 new changeset in galaxy-central: http://bitbucket.org/galaxy/galaxy-central/changeset/f1a250bbff0b/ changeset: r5373:f1a250bbff0b user: greg date: 2011-04-13 19:52:05 summary: 1. Add support for manual file transfer from external services to a Galaxy data library using scp. This is currently enabled via the enable_beta_job_managers flag. This should not currently be used because it hits the db significantly to determine if dererred jobs are queued for running. The plan is to use messaging for this, but that approach is not yet implemented. Rabbit mq is still being used to enable manual file transfer to Galaxy data libraries when the enable_beta_job_managers flag is not set. 2. Clean up the code that renders the jquery file browser plugin - things work much cleaner now. One remaining item is handing the issue where rsh / ssh keys are not set. Also clean up a lot of code related to manual file transfers in the requests_admin controller. 3. In addition to the new ManualDataTransferPlugin, add the Pacific Biosciences SMRTPortalPlugin that automatically transfers files produced by SMRT Portal to a Galaxy data library. 4. Add the external service type config for Pac Bio SMRT Portal version 1.1.0. affected #: 17 files (6.9 KB) --- a/external_service_types/454_life_sciences.xml Tue Apr 12 23:41:06 2011 -0400 +++ b/external_service_types/454_life_sciences.xml Wed Apr 13 13:52:05 2011 -0400 @@ -3,7 +3,7 @@ <version>1</version><data_transfer_settings> - <data_transfer type='scp' automatic_transfer='False' host='host' user_name='user_name' password='password' data_location='data_location' /> + <data_transfer protocol='scp' automatic_transfer='False' host='host' user_name='user_name' password='password' data_location='data_location' /></data_transfer_settings><run_details> --- a/external_service_types/applied_biosystems_solid.xml Tue Apr 12 23:41:06 2011 -0400 +++ b/external_service_types/applied_biosystems_solid.xml Wed Apr 13 13:52:05 2011 -0400 @@ -3,7 +3,7 @@ <version>3</version><data_transfer_settings> - <data_transfer type='scp' automatic_transfer='False' host='host' user_name='user_name' password='password' data_location='data_location' rename_dataset='rename_dataset' /> + <data_transfer protocol='scp' automatic_transfer='False' host='host' user_name='user_name' password='password' data_location='data_location' rename_dataset='rename_dataset' /></data_transfer_settings><run_details> @@ -31,7 +31,7 @@ <field name="dataset2_name" type="text" label="Sample run output 2" description="" value="Quality file" required="True" /><field name="dataset2_datatype" type="text" label="Sample run datatype 2" description="" value="qual" required="True" /><field name="dataset3_name" type="text" label="Sample run output 3" description="" value="STATS file" required="True" /> - <field name="dataset3_datatype" type="text" label="Sample run datatype 3" description="" value="stats" required="True" /> + <field name="dataset3_datatype" type="text" label="Sample run datatype 3" description="" value="txt" required="True" /></fields></form> --- a/external_service_types/simple_unknown_sequencer.xml Tue Apr 12 23:41:06 2011 -0400 +++ b/external_service_types/simple_unknown_sequencer.xml Wed Apr 13 13:52:05 2011 -0400 @@ -7,7 +7,7 @@ <description></description><version></version><data_transfer_settings> - <data_transfer type='scp' automatic_transfer='False' host='host' user_name='user_name' password='password' data_location='data_location'/> + <data_transfer protocol='scp' automatic_transfer='False' host='host' user_name='user_name' password='password' data_location='data_location'/></data_transfer_settings><form type="external_service" name="Simple unknown sequencer form" description=""><fields> --- a/lib/galaxy/jobs/deferred/__init__.py Tue Apr 12 23:41:06 2011 -0400 +++ b/lib/galaxy/jobs/deferred/__init__.py Wed Apr 13 13:52:05 2011 -0400 @@ -26,7 +26,6 @@ self.monitor_thread = threading.Thread( target=self.__monitor ) self.monitor_thread.start() log.info( 'Deferred job queue started' ) - def _load_plugins( self ): for fname in os.listdir( os.path.dirname( __file__ ) ): if not fname.startswith( '_' ) and fname.endswith( '.py' ): @@ -53,7 +52,6 @@ self.plugins[obj] = plugin( self.app ) self.plugins[obj].job_states = self.job_states log.debug( 'Loaded deferred job plugin: %s' % display_name ) - def __check_jobs_at_startup( self ): waiting_jobs = self.sa_session.query( model.DeferredJob ) \ .filter( model.DeferredJob.state == model.DeferredJob.states.WAITING ).all() @@ -66,7 +64,6 @@ # Pass the job ID as opposed to the job, since the monitor thread # needs to load it in its own threadlocal scoped session. self.waiting_jobs.append( job.id ) - def __monitor( self ): while self.running: try: @@ -75,7 +72,6 @@ log.exception( 'Exception in monitor_step' ) self.sleeper.sleep( 1 ) log.info( 'job queue stopped' ) - def __monitor_step( self ): # TODO: Querying the database with this frequency is bad, we need message passing new_jobs = self.sa_session.query( model.DeferredJob ) \ @@ -121,7 +117,6 @@ else: new_waiting.append( job ) self.waiting_jobs = new_waiting - def __check_job_plugin( self, job ): if job.plugin not in self.plugins: log.error( 'Invalid deferred job plugin: %s' ) % job.plugin @@ -130,15 +125,12 @@ self.sa_session.flush() return False return True - def __check_if_ready_to_run( self, job ): return self.plugins[job.plugin].check_job( job ) - def __fail_job( self, job ): job.state = model.DeferredJob.states.ERROR self.sa_session.add( job ) self.sa_session.flush() - def shutdown( self ): self.running = False self.sleeper.wake() @@ -158,3 +150,34 @@ self.condition.acquire() self.condition.notify() self.condition.release() + +class FakeTrans( object ): + """A fake trans for calling the external set metadata tool""" + def __init__( self, app, history=None, user=None): + class Dummy( object ): + def __init__( self ): + self.id = None + self.app = app + self.sa_session = app.model.context.current + self.dummy = Dummy() + self.history = history + self.user = user + self.model = app.model + def get_galaxy_session( self ): + return self.dummy + def log_event( self, message, tool_id=None ): + pass + def get_current_user_roles( self ): + if self.user: + return self.user.all_roles() + else: + return [] + def db_dataset_for( self, dbkey ): + if self.history is None: + return None + datasets = self.sa_session.query( self.app.model.HistoryDatasetAssociation ) \ + .filter_by( deleted=False, history_id=self.history.id, extension="len" ) + for ds in datasets: + if dbkey == ds.dbkey: + return ds + return None --- a/lib/galaxy/jobs/transfer_manager.py Tue Apr 12 23:41:06 2011 -0400 +++ b/lib/galaxy/jobs/transfer_manager.py Wed Apr 13 13:52:05 2011 -0400 @@ -24,11 +24,20 @@ self.restarter = threading.Thread( target=self.__restarter ) self.restarter.start() def new( self, path=None, **kwd ): - if 'url' not in kwd: - raise Exception( 'Missing required parameter "url".' ) - # try: except JSON: - transfer_job = self.app.model.TransferJob( state=self.app.model.TransferJob.states.NEW, - params=kwd ) + if 'protocol' not in kwd: + raise Exception( 'Missing required parameter "protocol".' ) + protocol = kwd[ 'protocol' ] + if protocol in [ 'http', 'https' ]: + if 'url' not in kwd: + raise Exception( 'Missing required parameter "url".' ) + transfer_job = self.app.model.TransferJob( state=self.app.model.TransferJob.states.NEW, params=kwd ) + elif protocol == 'scp': + # TODO: add more checks here? + if 'sample_dataset_id' not in kwd: + raise Exception( 'Missing required parameter "sample_dataset_id".' ) + if 'file_path' not in kwd: + raise Exception( 'Missing required parameter "file_path".' ) + transfer_job = self.app.model.TransferJob( state=self.app.model.TransferJob.states.NEW, params=kwd ) self.sa_session.add( transfer_job ) self.sa_session.flush() return transfer_job @@ -48,6 +57,8 @@ self.sa_session.add_all( transfer_jobs ) self.sa_session.flush() for tj in transfer_jobs: + params_dict = tj.params + protocol = params_dict[ 'protocol' ] # The transfer script should daemonize fairly quickly - if this is # not the case, this process will need to be moved to a # non-blocking method. @@ -101,7 +112,7 @@ if tj_state['state'] in self.app.model.TransferJob.terminal_states: log.debug( 'Transfer job %s is in terminal state: %s' % ( tj_state['transfer_job_id'], tj_state['state'] ) ) elif tj_state['state'] == self.app.model.TransferJob.states.PROGRESS and 'percent' in tj_state: - log.debug( 'Transfer job %s is %s%% complete' % ( tj_state['transfer_job_id'], tj_state['percent'] ) ) + log.debug( 'Transfer job %s is %s%% complete' % ( tj_state[ 'transfer_job_id' ], tj_state[ 'percent' ] ) ) if len( rval ) == 1: return rval[0] return rval --- a/lib/galaxy/model/__init__.py Tue Apr 12 23:41:06 2011 -0400 +++ b/lib/galaxy/model/__init__.py Wed Apr 13 13:52:05 2011 -0400 @@ -1796,7 +1796,9 @@ self.comment = comment class ExternalService( object ): - data_transfer_types = Bunch( SCP = 'scp' ) + data_transfer_protocol = Bunch( HTTP = 'http', + HTTPS = 'https', + SCP = 'scp' ) def __init__( self, name=None, description=None, external_service_type_id=None, version=None, form_definition_id=None, form_values_id=None, deleted=None ): self.name = name self.description = description @@ -1812,8 +1814,8 @@ trans.app.external_service_types.reload( self.external_service_type_id ) self.data_transfer = {} external_service_type = self.get_external_service_type( trans ) - for data_transfer_type, data_transfer_obj in external_service_type.data_transfer.items(): - if data_transfer_type == self.data_transfer_types.SCP: + for data_transfer_protocol, data_transfer_obj in external_service_type.data_transfer.items(): + if data_transfer_protocol == self.data_transfer_protocol.SCP: scp_configs = {} automatic_transfer = data_transfer_obj.config.get( 'automatic_transfer', 'false' ) scp_configs[ 'automatic_transfer' ] = util.string_as_bool( automatic_transfer ) @@ -1822,7 +1824,7 @@ scp_configs[ 'password' ] = self.form_values.content.get( data_transfer_obj.config.get( 'password', '' ), '' ) scp_configs[ 'data_location' ] = self.form_values.content.get( data_transfer_obj.config.get( 'data_location', '' ), '' ) scp_configs[ 'rename_dataset' ] = self.form_values.content.get( data_transfer_obj.config.get( 'rename_dataset', '' ), '' ) - self.data_transfer[ self.data_transfer_types.SCP ] = scp_configs + self.data_transfer[ self.data_transfer_protocol.SCP ] = scp_configs def populate_actions( self, trans, item, param_dict=None ): return self.get_external_service_type( trans ).actions.populate( self, item, param_dict=param_dict ) @@ -1992,14 +1994,14 @@ def print_ticks( d ): pass error_msg = 'Error encountered in determining the file size of %s on the external_service.' % filepath - if not scp_configs['host'] or not scp_configs['user_name'] or not scp_configs['password']: + if not scp_configs[ 'host' ] or not scp_configs[ 'user_name' ] or not scp_configs[ 'password' ]: return error_msg login_str = '%s@%s' % ( scp_configs['user_name'], scp_configs['host'] ) cmd = 'ssh %s "du -sh \'%s\'"' % ( login_str, filepath ) try: output = pexpect.run( cmd, - events={ '.ssword:*': scp_configs['password']+'\r\n', - pexpect.TIMEOUT:print_ticks}, + events={ '.ssword:*' : scp_configs['password'] + '\r\n', + pexpect.TIMEOUT : print_ticks }, timeout=10 ) except Exception, e: return error_msg @@ -2013,7 +2015,7 @@ def run_details( self ): # self.runs is a list of SampleRunAssociations ordered descending on update_time. if self.runs: - # Always use the lates run details template, self.runs[0] is a SampleRunAssociation + # Always use the latest run details template, self.runs[0] is a SampleRunAssociation return self.runs[0] # Inherit this sample's RequestType run details, if one exists. return self.request.type.run_details --- a/lib/galaxy/sample_tracking/data_transfer.py Tue Apr 12 23:41:06 2011 -0400 +++ b/lib/galaxy/sample_tracking/data_transfer.py Wed Apr 13 13:52:05 2011 -0400 @@ -14,11 +14,11 @@ # TODO: The 'automatic_transfer' setting is for future use. If set to True, we will need to # ensure the sample has an associated destination data library before it moves to a certain state # ( e.g., Run started ). - self.config['automatic_transfer'] = elem.get( 'automatic_transfer' ) - self.config['host'] = elem.get( 'host' ) - self.config['user_name'] = elem.get( 'user_name' ) - self.config['password'] = elem.get( 'password' ) - self.config['data_location'] = elem.get( 'data_location' ) + self.config[ 'automatic_transfer' ] = elem.get( 'automatic_transfer' ) + self.config[ 'host' ] = elem.get( 'host' ) + self.config[ 'user_name' ] = elem.get( 'user_name' ) + self.config[ 'password' ] = elem.get( 'password' ) + self.config[ 'data_location' ] = elem.get( 'data_location' ) # 'rename_dataset' is optional and it may not be defined in all external types # It is only used is AB SOLiD external service type for now rename_dataset = elem.get( 'rename_dataset', None ) --- a/lib/galaxy/sample_tracking/external_service_types.py Tue Apr 12 23:41:06 2011 -0400 +++ b/lib/galaxy/sample_tracking/external_service_types.py Wed Apr 13 13:52:05 2011 -0400 @@ -86,17 +86,18 @@ data_transfer_settings_elem = root.find( 'data_transfer_settings' ) # Currently only data transfer using scp is supported. for data_transfer_elem in data_transfer_settings_elem.findall( "data_transfer" ): - if data_transfer_elem.get( 'type' ) == model.ExternalService.data_transfer_types.SCP: - scp_data_transfer = data_transfer_factories[ model.ExternalService.data_transfer_types.SCP ] + if data_transfer_elem.get( 'protocol' ) == model.ExternalService.data_transfer_protocol.SCP: + scp_data_transfer = data_transfer_factories[ model.ExternalService.data_transfer_protocol.SCP ] scp_data_transfer.parse( self.config_file, data_transfer_elem ) - self.data_transfer[ model.ExternalService.data_transfer_types.SCP ] = scp_data_transfer + self.data_transfer[ model.ExternalService.data_transfer_protocol.SCP ] = scp_data_transfer def parse_run_details( self, root ): self.run_details = {} run_details_elem = root.find( 'run_details' ) if run_details_elem: results_elem = run_details_elem.find( 'results' ) if results_elem: - # get the list of resulting datatypes + # Get the list of resulting datatypes + # TODO: the 'results_urls' attribute is only useful if the transfer protocol is http(s), so check if that is the case. self.run_details[ 'results' ], self.run_details[ 'results_urls' ] = self.parse_run_details_results( results_elem ) def parse_run_details_results( self, root ): datatypes_dict = {} --- a/lib/galaxy/web/controllers/external_service.py Tue Apr 12 23:41:06 2011 -0400 +++ b/lib/galaxy/web/controllers/external_service.py Wed Apr 13 13:52:05 2011 -0400 @@ -176,7 +176,6 @@ trans.sa_session.add( external_service ) trans.sa_session.add( external_service.form_values ) trans.sa_session.flush() - external_service.load_data_transfer_settings( trans ) else: # We're saving a newly created external_service external_service_type = self.get_external_service_type( trans, external_service_type_id ) --- a/lib/galaxy/web/controllers/requests_admin.py Tue Apr 12 23:41:06 2011 -0400 +++ b/lib/galaxy/web/controllers/requests_admin.py Wed Apr 13 13:52:05 2011 -0400 @@ -351,22 +351,11 @@ request_id = kwd.get( 'request_id', None ) external_service_id = kwd.get( 'external_service_id', None ) files = [] - def handle_error( **kwd ): - kwd[ 'status' ] = 'error' - return trans.response.send_redirect( web.url_for( controller='requests_admin', - action='select_datasets_to_transfer', - **kwd ) ) - try: - request = trans.sa_session.query( trans.model.Request ).get( trans.security.decode_id( request_id ) ) - except: - return invalid_id_redirect( trans, 'requests_admin', request_id ) - try: - external_service = trans.sa_session.query( trans.model.ExternalService ).get( trans.security.decode_id( external_service_id ) ) - except: - return invalid_id_redirect( trans, 'requests_admin', external_service_id, 'external_service', action='browse_external_services' ) - # load the data transfer settings + request = trans.sa_session.query( trans.model.Request ).get( trans.security.decode_id( request_id ) ) + external_service = trans.sa_session.query( trans.model.ExternalService ).get( trans.security.decode_id( external_service_id ) ) + # Load the data transfer settings external_service.load_data_transfer_settings( trans ) - scp_configs = external_service.data_transfer[ trans.model.ExternalService.data_transfer_types.SCP ] + scp_configs = external_service.data_transfer[ trans.model.ExternalService.data_transfer_protocol.SCP ] selected_datasets_to_transfer = util.restore_text( params.get( 'selected_datasets_to_transfer', '' ) ) if selected_datasets_to_transfer: selected_datasets_to_transfer = selected_datasets_to_transfer.split(',') @@ -383,10 +372,13 @@ if params.get( 'select_datasets_to_transfer_button', False ): # Get the sample that was sequenced to produce these datasets. if sample_id == 'none': + del kwd[ 'select_datasets_to_transfer_button' ] message = 'Select the sample that was sequenced to produce the datasets you want to transfer.' kwd[ 'message' ] = message - del kwd[ 'select_datasets_to_transfer_button' ] - handle_error( **kwd ) + kwd[ 'status' ] = 'error' + return trans.response.send_redirect( web.url_for( controller='requests_admin', + action='select_datasets_to_transfer', + **kwd ) ) if not sample.library: # Display an error if a sample has been selected that # has not yet been associated with a destination library. @@ -399,7 +391,7 @@ status=status, message=message ) ) # Save the sample datasets - sample_dataset_file_names = self.__save_sample_datasets( trans, sample, selected_datasets_to_transfer, external_service ) + sample_dataset_file_names = self.__create_sample_datasets( trans, sample, selected_datasets_to_transfer, external_service ) if sample_dataset_file_names: message = 'Datasets (%s) have been selected for sample (%s)' % \ ( str( sample_dataset_file_names )[1:-1].replace( "'", "" ), sample.name ) @@ -426,24 +418,31 @@ # Avoid caching trans.response.headers['Pragma'] = 'no-cache' trans.response.headers['Expires'] = '0' - request = trans.sa_session.query( trans.model.Request ).get( int( id ) ) - external_service = trans.sa_session.query( trans.model.ExternalService ).get( int( external_service_id ) ) + request = trans.sa_session.query( trans.model.Request ).get( trans.security.decode_id( request_id ) ) + external_service = trans.sa_session.query( trans.model.ExternalService ).get( trans.security.decode_id( external_service_id ) ) external_service.load_data_transfer_settings( trans ) - scp_configs = external_service.data_transfer[ trans.model.ExternalService.data_transfer_types.SCP ] - cmd = 'ssh %s@%s "ls -oghp \'%s\'"' % ( scp_configs['user_name'], - scp_configs['host'], + scp_configs = external_service.data_transfer[ trans.model.ExternalService.data_transfer_protocol.SCP ] + cmd = 'ssh %s@%s "ls -oghp \'%s\'"' % ( scp_configs[ 'user_name' ], + scp_configs[ 'host' ], folder_path ) + # TODO: this currently requires rsh / ssh keys to be set. If they are not, the process + # hangs. Add an event that handles the authentication message if keys are not set - the + # message is something like: "Are you sure you want to continue connecting (yes/no)." output = pexpect.run( cmd, - events={ '.ssword:*' : scp_configs[ 'password'] + '\r\n', pexpect.TIMEOUT : print_ticks }, + events={ '.ssword:*' : scp_configs[ 'password' ] + '\r\n', + pexpect.TIMEOUT : print_ticks }, timeout=10 ) + if 'Password:\r\n' in output: + # Eliminate the output created using ssh from the tree + output = output.replace( 'Password:\r\n', '' ) return unicode( output.replace( '\n', '<br/>' ) ) @web.json def open_folder( self, trans, request_id, external_service_id, key ): # Avoid caching trans.response.headers['Pragma'] = 'no-cache' trans.response.headers['Expires'] = '0' - request = trans.sa_session.query( trans.model.Request ).get( int( request_id ) ) - external_service = trans.sa_session.query( trans.model.ExternalService ).get( int( external_service_id ) ) + request = trans.sa_session.query( trans.model.Request ).get( trans.security.decode_id( request_id ) ) + external_service = trans.sa_session.query( trans.model.ExternalService ).get( trans.security.decode_id( external_service_id ) ) folder_path = key files_list = self.__get_files( trans, request, external_service, folder_path ) folder_contents = [] @@ -451,49 +450,57 @@ is_folder = False if filename[-1] == os.sep: is_folder = True - full_path = os.path.join(folder_path, filename) - node = {"title": filename, - "isFolder": is_folder, - "isLazy": is_folder, - "tooltip": full_path, - "key": full_path - } - folder_contents.append(node) + full_path = os.path.join( folder_path, filename ) + node = { "title": filename, + "isFolder": is_folder, + "isLazy": is_folder, + "tooltip": full_path, + "key": full_path } + folder_contents.append( node ) return folder_contents def __get_files( self, trans, request, external_service, folder_path ): # Retrieves the filenames to be transferred from the remote host. ok = True external_service.load_data_transfer_settings( trans ) - scp_configs = external_service.data_transfer[ trans.model.ExternalService.data_transfer_types.SCP ] + scp_configs = external_service.data_transfer[ trans.model.ExternalService.data_transfer_protocol.SCP ] if not scp_configs[ 'host' ] or not scp_configs[ 'user_name' ] or not scp_configs[ 'password' ]: status = 'error' message = "Error in external service login information." ok = False def print_ticks( d ): pass - cmd = 'ssh %s@%s "ls -p \'%s\'"' % ( scp_configs['user_name'], scp_configs['host'], folder_path ) + cmd = 'ssh %s@%s "ls -p \'%s\'"' % ( scp_configs[ 'user_name' ], scp_configs[ 'host' ], folder_path ) + # TODO: this currently requires rsh / ssh keys to be set. If they are not, the process + # hangs. Add an event that handles the authentication message if keys are not set - the + # message is something like: "Are you sure you want to continue connecting (yes/no)." output = pexpect.run( cmd, - events={ '.ssword:*' : scp_configs['password'] + '\r\n', pexpect.TIMEOUT : print_ticks }, + events={ '.ssword:*' : scp_configs[ 'password' ] + '\r\n', + pexpect.TIMEOUT : print_ticks }, timeout=10 ) if 'No such file or directory' in output: status = 'error' message = "No folder named (%s) exists on the external service." % folder_path ok = False if ok: - return output.splitlines() + if 'Password:' in output: + # Eliminate the output created using ssh from the tree + output_as_list = output.splitlines() + output_as_list.remove( 'Password:' ) + else: + output_as_list = output.splitlines() + return output_as_list return trans.response.send_redirect( web.url_for( controller='requests_admin', action='select_datasets_to_transfer', request_id=trans.security.encode_id( request.id ), status=status, message=message ) ) - def __save_sample_datasets( self, trans, sample, selected_datasets_to_transfer, external_service ): + def __create_sample_datasets( self, trans, sample, selected_datasets_to_transfer, external_service ): external_service.load_data_transfer_settings( trans ) - scp_configs = external_service.data_transfer[ trans.model.ExternalService.data_transfer_types.SCP ] + scp_configs = external_service.data_transfer[ trans.model.ExternalService.data_transfer_protocol.SCP ] sample_dataset_file_names = [] if selected_datasets_to_transfer: for filepath in selected_datasets_to_transfer: - # FIXME: handle folder selection - # ignore folders for now + # FIXME: handle folder selection - ignore folders for now if filepath[-1] != os.sep: name = self.__rename_dataset( sample, filepath.split( '/' )[-1], scp_configs ) status = trans.app.model.SampleDataset.transfer_status.NOT_STARTED @@ -522,25 +529,26 @@ else: new_name = name return util.sanitize_for_filename( new_name ) - def __check_library_add_permission( self, trans, target_library, target_folder ): + def __ensure_library_add_permission( self, trans, target_library, target_folder ): """ - Checks if the current admin user had ADD_LIBRARY permission on the target library - and the target folder, if not provide the permissions. + Ensures the current admin user has ADD_LIBRARY permission on the target data library and folder. """ current_user_roles = trans.user.all_roles() current_user_private_role = trans.app.security_agent.get_private_user_role( trans.user ) - # Make sure this user has LIBRARY_ADD permissions on the target library and folder. - # If not, give them permission. + flush_needed = False if not trans.app.security_agent.can_add_library_item( current_user_roles, target_library ): lp = trans.model.LibraryPermissions( trans.app.security_agent.permitted_actions.LIBRARY_ADD.action, target_library, current_user_private_role ) trans.sa_session.add( lp ) + flush_needed = True if not trans.app.security_agent.can_add_library_item( current_user_roles, target_folder ): lfp = trans.model.LibraryFolderPermissions( trans.app.security_agent.permitted_actions.LIBRARY_ADD.action, target_folder, current_user_private_role ) trans.sa_session.add( lfp ) + flush_needed = True + if flush_needed: trans.sa_session.flush() def __create_data_transfer_messages( self, trans, sample, selected_sample_datasets ): """ @@ -587,7 +595,7 @@ messages = [] for external_service, dataset_elem in dataset_elements.items(): external_service.load_data_transfer_settings( trans ) - scp_configs = external_service.data_transfer[ trans.model.ExternalService.data_transfer_types.SCP ] + scp_configs = external_service.data_transfer[ trans.model.ExternalService.data_transfer_protocol.SCP ] # Check data transfer settings err_msg = self.__validate_data_transfer_settings( trans, sample.request.type, scp_configs ) if err_msg: @@ -596,16 +604,16 @@ sample_id=trans.security.encode_id( sample.id ), status='error', message=err_msg ) ) - message = xml % dict( GALAXY_HOST=trans.request.host, - API_KEY=trans.user.api_keys[0].key, - DATA_HOST=scp_configs[ 'host' ], - DATA_USER=scp_configs[ 'user_name' ], - DATA_PASSWORD=scp_configs[ 'password' ], - REQUEST_ID=str( sample.request.id ), - SAMPLE_ID=str( sample.id ), - LIBRARY_ID=str( sample.library.id ), - FOLDER_ID=str( sample.folder.id ), - DATASETS=dataset_elem ) + message = xml % dict( GALAXY_HOST=trans.request.host, + API_KEY=trans.user.api_keys[0].key, + DATA_HOST=scp_configs[ 'host' ], + DATA_USER=scp_configs[ 'user_name' ], + DATA_PASSWORD=scp_configs[ 'password' ], + REQUEST_ID=str( sample.request.id ), + SAMPLE_ID=str( sample.id ), + LIBRARY_ID=str( sample.library.id ), + FOLDER_ID=str( sample.folder.id ), + DATASETS=dataset_elem ) messages.append( message.replace( '\n', '' ).replace( '\r', '' ) ) return messages def __validate_data_transfer_settings( self, trans, request_type, scp_configs ): @@ -620,10 +628,10 @@ err_msg += "The 'enable_api = True' setting is not correctly set in the Galaxy config file. " if not trans.user.api_keys: err_msg += "Set your API Key in your User Preferences to transfer datasets. " - # check if library_import_dir is set + # Check if library_import_dir is set if not trans.app.config.library_import_dir: err_msg = "'The library_import_dir' setting is not correctly set in the Galaxy config file. " - # check the RabbitMQ server settings in the config file + # Check the RabbitMQ server settings in the config file for k, v in trans.app.config.amqp.items(): if not v: err_msg += 'Set RabbitMQ server settings in the "galaxy_amqp" section of the Galaxy config file, specifically "%s" is not set.' % k @@ -632,60 +640,79 @@ @web.expose @web.require_admin def initiate_data_transfer( self, trans, sample_id, sample_datasets=[], sample_dataset_id='' ): - ''' - Initiate the transfer of the datasets from the external service to the target Galaxy data library: - - The admin user must have LIBRARY_ADD permission for the target library and folder - - Create an XML message encapsulating all the data transfer information and send it - to the message queue (RabbitMQ broker). - ''' + # Initiate the transfer of the datasets from the external service to the target Galaxy data library. + # The admin user must have LIBRARY_ADD permission for the target library and folder try: sample = trans.sa_session.query( trans.model.Sample ).get( trans.security.decode_id( sample_id ) ) except: return invalid_id_redirect( trans, 'requests_admin', sample_id, 'sample' ) - err_msg = '' - # Make sure the current user has LIBRARY_ADD - # permission on the target library and folder. - self.__check_library_add_permission( trans, sample.library, sample.folder ) + message = "" + status = "done" + # Make sure the current admin user has LIBRARY_ADD permission on the target data library and folder. + self.__ensure_library_add_permission( trans, sample.library, sample.folder ) if sample_dataset_id and not sample_datasets: # Either a list of SampleDataset objects or a comma-separated string of # encoded SampleDataset ids can be received. If the latter, parse the - # sample_dataset_id to build the list of sample_datasets. + # sample_dataset_id string to build the list of sample_datasets. id_list = util.listify( sample_dataset_id ) for sample_dataset_id in id_list: sample_dataset = trans.sa_session.query( trans.model.SampleDataset ).get( trans.security.decode_id( sample_dataset_id ) ) sample_datasets.append( sample_dataset ) - # Create the message - messages = self.__create_data_transfer_messages( trans, sample, sample_datasets ) - # Send the messages - for message in messages: - try: - conn = amqp.Connection( host=trans.app.config.amqp[ 'host' ] + ":" + trans.app.config.amqp[ 'port' ], - userid=trans.app.config.amqp[ 'userid' ], - password=trans.app.config.amqp[ 'password' ], - virtual_host=trans.app.config.amqp[ 'virtual_host' ], - insist=False ) - chan = conn.channel() - msg = amqp.Message( message, - content_type='text/plain', - application_headers={ 'msg_type': 'data_transfer' } ) - msg.properties[ "delivery_mode" ] = 2 - chan.basic_publish( msg, - exchange=trans.app.config.amqp[ 'exchange' ], - routing_key=trans.app.config.amqp[ 'routing_key' ] ) - chan.close() - conn.close() - except Exception, e: - err_msg = "Error sending the data transfer message to the Galaxy AMQP message queue:<br/>%s" % str(e) - if not err_msg: - err_msg = "%i datasets have been queued for transfer from the external service." % len( sample_datasets ) - status = "done" + if trans.app.config.enable_beta_job_managers: + # For now, assume that all SampleDatasets use the same external service ( this may not be optimal ). + if sample_datasets: + external_service_type_id = sample_datasets[0].external_service.external_service_type_id + # Here external_service_type_id will be something like '454_life_sciences' + external_service = sample.request.type.get_external_service( external_service_type_id ) + external_service_type = external_service.get_external_service_type( trans ) + external_service.load_data_transfer_settings( trans ) + # For now only scp is supported. + scp_configs = external_service.data_transfer[ trans.model.ExternalService.data_transfer_protocol.SCP ] + if not scp_configs[ 'automatic_transfer' ]: + deferred_plugin = 'ManualDataTransferPlugin' + else: + raise Exception( "Automatic data transfer using scp is not yet suppored." ) + trans.app.job_manager.deferred_job_queue.plugins[ deferred_plugin ].create_job( trans, + sample=sample, + sample_datasets=sample_datasets, + external_service=external_service, + external_service_type=external_service_type ) else: - status = 'error' + # TODO: Using RabbitMq for now, but eliminate this entire block when we replace RabbitMq with Galaxy's + # own messaging engine. We're holding off on using the new way to transfer files manually until we + # implement a Galaxy-proprietary messaging engine because the deferred job plugins currently perform + # constant db hits to check for deferred jobs that are not in a finished state. + # Create the message + messages = self.__create_data_transfer_messages( trans, sample, sample_datasets ) + # Send the messages + for rmq_msg in messages: + try: + conn = amqp.Connection( host=trans.app.config.amqp[ 'host' ] + ":" + trans.app.config.amqp[ 'port' ], + userid=trans.app.config.amqp[ 'userid' ], + password=trans.app.config.amqp[ 'password' ], + virtual_host=trans.app.config.amqp[ 'virtual_host' ], + insist=False ) + chan = conn.channel() + msg = amqp.Message( rmq_msg, + content_type='text/plain', + application_headers={ 'msg_type': 'data_transfer' } ) + msg.properties[ "delivery_mode" ] = 2 + chan.basic_publish( msg, + exchange=trans.app.config.amqp[ 'exchange' ], + routing_key=trans.app.config.amqp[ 'routing_key' ] ) + chan.close() + conn.close() + except Exception, e: + message = "Error sending the data transfer message to the Galaxy AMQP message queue:<br/>%s" % str(e) + status = "error" + if not message: + message = "%i datasets have been queued for transfer from the external service." % len( sample_datasets ) + status = "done" return trans.response.send_redirect( web.url_for( controller='requests_admin', action='manage_datasets', sample_id=trans.security.encode_id( sample.id ), - status=status, - message=err_msg ) ) + message=message, + status=status ) ) @web.expose def update_sample_dataset_status(self, trans, cntrller, sample_dataset_ids, new_status, error_msg=None ): # check if the new status is a valid transfer status --- a/scripts/galaxy_messaging/server/amqp_consumer.py Tue Apr 12 23:41:06 2011 -0400 +++ b/scripts/galaxy_messaging/server/amqp_consumer.py Wed Apr 13 13:52:05 2011 -0400 @@ -3,7 +3,7 @@ Galaxy uses AMQ protocol to receive messages from external sources like bar code scanners. Galaxy has been tested against RabbitMQ AMQP implementation. For Galaxy to receive messages from a message queue the RabbitMQ server has -to be set up with a user account and other parameters listed in the [galaxy:amq] +to be set up with a user account and other parameters listed in the [galaxy_amqp] section in the universe_wsgi.ini config file Once the RabbitMQ server has been setup and started with the given parameters, this script can be run to receive messages and update the Galaxy database accordingly --- a/scripts/transfer.py Tue Apr 12 23:41:06 2011 -0400 +++ b/scripts/transfer.py Wed Apr 13 13:52:05 2011 -0400 @@ -4,22 +4,17 @@ Manager (galaxy.jobs.transfer_manager) and should not normally be invoked by hand. """ - -import os, sys, optparse, ConfigParser, socket, SocketServer, threading, logging, random - -import urllib2, tempfile - -import time - -log = logging.getLogger( __name__ ) -log.setLevel( logging.INFO ) -handler = logging.StreamHandler( sys.stdout ) -log.addHandler( handler ) +import os, sys, optparse, ConfigParser, socket, SocketServer, threading, logging, random, urllib2, tempfile, time galaxy_root = os.path.abspath( os.path.join( os.path.dirname( __file__ ), '..' ) ) sys.path.insert( 0, os.path.abspath( os.path.join( galaxy_root, 'lib' ) ) ) from galaxy import eggs + +import pkg_resources +pkg_resources.require( "pexpect" ) +import pexpect + eggs.require( "SQLAlchemy >= 0.4" ) from sqlalchemy import * @@ -32,6 +27,11 @@ eggs.require( 'python_daemon' ) from daemon import DaemonContext +log = logging.getLogger( __name__ ) +log.setLevel( logging.DEBUG ) +handler = logging.StreamHandler( sys.stdout ) +log.addHandler( handler ) + debug = False slow = False @@ -49,7 +49,7 @@ def parse( self ): self.opts, args = self.parser.parse_args() if len( args ) != 1: - log.error( 'usage: transfer.py [options] <transfer job id>' ) + log.error( 'usage: transfer.py <transfer job id>' ) sys.exit( 1 ) try: self.transfer_job_id = int( args[0] ) @@ -138,57 +138,55 @@ self.result = result def transfer( app, transfer_job_id ): - transfer_job = app.get_transfer_job( transfer_job_id ) if transfer_job is None: log.error( 'Invalid transfer job ID: %s' % transfer_job_id ) return False - port_range = app.config.get( 'app:main', 'transfer_worker_port_range' ) try: port_range = [ int( p ) for p in port_range.split( '-' ) ] except Exception, e: log.error( 'Invalid port range set in transfer_worker_port_range: %s: %s' % ( port_range, str( e ) ) ) return False - - protocol = transfer_job.params['url'].split( '://' )[0] - if protocol not in ( 'http', 'https' ): + protocol = transfer_job.params[ 'protocol' ] + if protocol not in ( 'http', 'https', 'scp' ): log.error( 'Unsupported protocol: %s' % protocol ) return False - state_result = StateResult( result = dict( state = transfer_job.states.RUNNING, info='Transfer process starting up.' ) ) - listener_server = ListenerServer( range( port_range[0], port_range[1] + 1 ), ListenerRequestHandler, app, transfer_job, state_result ) - # daemonize here (if desired) if not debug: daemon_context = DaemonContext( files_preserve=[ listener_server.fileno() ], working_directory=os.getcwd() ) daemon_context.open() - # If this fails, it'll never be detected. Hopefully it won't fail since it succeeded once. app.connect_database() # daemon closed the database fd transfer_job = app.get_transfer_job( transfer_job_id ) - listener_thread = threading.Thread( target=listener_server.serve_forever ) listener_thread.setDaemon( True ) listener_thread.start() - # Store this process' pid so unhandled deaths can be handled by the restarter transfer_job.pid = os.getpid() app.sa_session.add( transfer_job ) app.sa_session.flush() - terminal_state = None - if protocol in ( 'http', 'https' ): - for state in http_transfer( transfer_job ): - state_result.result = state - if state['state'] in transfer_job.terminal_states: - terminal_state = state + if protocol in [ 'http', 'https' ]: + for transfer_result_dict in http_transfer( transfer_job ): + state_result.result = transfer_result_dict + if transfer_result_dict[ 'state' ] in transfer_job.terminal_states: + terminal_state = transfer_result_dict + elif protocol in [ 'scp' ]: + # Transfer the file using scp + transfer_result_dict = scp_transfer( transfer_job ) + # Handle the state of the transfer + state = transfer_result_dict[ 'state' ] + state_result.result = transfer_result_dict + if state in transfer_job.terminal_states: + terminal_state = transfer_result_dict if terminal_state is not None: - transfer_job.state = terminal_state['state'] - for name in ( 'info', 'path' ): + transfer_job.state = terminal_state[ 'state' ] + for name in [ 'info', 'path' ]: if name in terminal_state: - transfer_job.__setattr__( name, terminal_state[name] ) + transfer_job.__setattr__( name, terminal_state[ name ] ) else: transfer_job.state = transfer_job.states.ERROR transfer_job.info = 'Unknown error encountered by transfer worker.' @@ -197,9 +195,7 @@ return True def http_transfer( transfer_job ): - """ - "Plugin" for handling http(s) transfers. - """ + """Plugin" for handling http(s) transfers.""" url = transfer_job.params['url'] try: f = urllib2.urlopen( url ) @@ -243,16 +239,41 @@ return return +def scp_transfer( transfer_job ): + """Plugin" for handling scp transfers using pexpect""" + def print_ticks( d ): + pass + host = transfer_job.params[ 'host' ] + user_name = transfer_job.params[ 'user_name' ] + password = transfer_job.params[ 'password' ] + file_path = transfer_job.params[ 'file_path' ] + try: + fh, fn = tempfile.mkstemp() + except Exception, e: + return dict( state = transfer_job.states.ERROR, info = 'Unable to create temporary file for transfer: %s' % str( e ) ) + try: + # TODO: add the ability to determine progress of the copy here like we do in the http_transfer above. + cmd = "scp %s@%s:'%s' '%s'" % ( user_name, + host, + file_path.replace( ' ', '\ ' ), + fn ) + output = pexpect.run( cmd, + events={ '.ssword:*': password + '\r\n', + pexpect.TIMEOUT: print_ticks }, + timeout=10 ) + return dict( state = transfer_job.states.DONE, path = fn ) + except Exception, e: + return dict( state = transfer_job.states.ERROR, info = 'Error during file transfer: %s' % str( e ) ) + if __name__ == '__main__': - arg_handler = ArgHandler() arg_handler.parse() app = GalaxyApp( arg_handler.opts.config ) - log.debug( 'Initiating transfer' ) + log.debug( 'Initiating transfer...' ) if transfer( app, arg_handler.transfer_job_id ): log.debug( 'Finished' ) else: - log.error( 'Error in transfer process' ) + log.error( 'Error in transfer process...' ) sys.exit( 1 ) sys.exit( 0 ) --- a/templates/admin/requests/select_datasets_to_transfer.mako Tue Apr 12 23:41:06 2011 -0400 +++ b/templates/admin/requests/select_datasets_to_transfer.mako Wed Apr 13 13:52:05 2011 -0400 @@ -23,56 +23,60 @@ minExpandLevel: 0, // 1: root node is not collapsible persist: false, checkbox: true, - selectMode: 3, + selectMode: 3, onPostInit: function(isReloading, isError) { -// alert("reloading: "+isReloading+", error:"+isError); - logMsg("onPostInit(%o, %o) - %o", isReloading, isError, this); - // Re-fire onActivate, so the text is updated - this.reactivate(); + //alert("reloading: "+isReloading+", error:"+isError); + logMsg("onPostInit(%o, %o) - %o", isReloading, isError, this); + // Re-fire onActivate, so the text is updated + this.reactivate(); }, fx: { height: "toggle", duration: 200 }, - // initAjax is hard to fake, so we pass the children as object array: - initAjax: {url: "${h.url_for( controller='requests_admin', action='open_folder' )}", - dataType: "json", - data: { request_id: "${request.id}", external_service_id: "${external_service.id}", key: "${scp_configs['data_location']}" }, - }, - onLazyRead: function(dtnode){ - dtnode.appendAjax({ - url: "${h.url_for( controller='requests_admin', action='open_folder' )}", - dataType: "json", - data: { request_id: "${request.id}", external_service_id: "${external_service.id}", key: dtnode.data.key }, + // initAjax is hard to fake, so we pass the children as object array: + initAjax: {url: "${h.url_for( controller='requests_admin', action='open_folder' )}", + dataType: "json", + data: { request_id: "${trans.security.encode_id( request.id )}", external_service_id: "${trans.security.encode_id( external_service.id )}", key: "${scp_configs['data_location']}" }, + }, + onLazyRead: function(dtnode){ + dtnode.appendAjax({ + url: "${h.url_for( controller='requests_admin', action='open_folder' )}", + dataType: "json", + data: { request_id: "${trans.security.encode_id( request.id )}", external_service_id: "${trans.security.encode_id( external_service.id )}", key: dtnode.data.key }, + }); + }, + onSelect: function(select, dtnode) { + // Display list of selected nodes + var selNodes = dtnode.tree.getSelectedNodes(); + // convert to title/key array + var selKeys = $.map(selNodes, function(node){ + return node.data.key; + }); + document.select_datasets_to_transfer.selected_datasets_to_transfer.value = selKeys.join(",") + }, + onActivate: function(dtnode) { + var cell = $("#file_details"); + var selected_value; + if (dtnode.data.key == 'root') { + selected_value = "${scp_configs['data_location']}/"; + } else { + selected_value = dtnode.data.key; + }; + if (selected_value.charAt(selected_value.length-1) != '/') { + // Make ajax call + $.ajax( { + type: "POST", + url: "${h.url_for( controller='requests_admin', action='get_file_details' )}", + dataType: "json", + data: { request_id: "${trans.security.encode_id(request.id)}", external_service_id: "${trans.security.encode_id(external_service.id)}", folder_path: selected_value }, + success : function ( data ) { + cell.html( '<label>'+data+'</label>' ) + } }); - }, - onSelect: function(select, dtnode) { - // Display list of selected nodes - var selNodes = dtnode.tree.getSelectedNodes(); - // convert to title/key array - var selKeys = $.map(selNodes, function(node){ - return node.data.key; - }); - document.select_datasets_to_transfer.selected_datasets_to_transfer.value = selKeys.join(",") - }, - onActivate: function(dtnode) { - var cell = $("#file_details"); - var selected_value = dtnode.data.key - if(selected_value.charAt(selected_value.length-1) != '/') { - // Make ajax call - $.ajax( { - type: "POST", - url: "${h.url_for( controller='requests_admin', action='get_file_details' )}", - dataType: "json", - data: { request_id: "${request.id}", external_service_id: "${external_service.id}", folder_path: dtnode.data.key }, - success : function ( data ) { - cell.html( '<label>'+data+'</label>' ) - } - }); } else { - cell.html( '' ) - } - }, - }); + cell.html( '' ); + }; + }, + }); }); - </script><% Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.