details: http://www.bx.psu.edu/hg/galaxy/rev/fe14a58568ad changeset: 3678:fe14a58568ad user: rc date: Wed Apr 21 16:42:09 2010 -0400 description: lims: fixed data transfer bugs diffstat: lib/galaxy/web/controllers/requests_admin.py | 2 +- scripts/galaxy_messaging/server/amqp_consumer.py | 15 +- scripts/galaxy_messaging/server/data_transfer.py | 29 ++- scripts/galaxy_messaging/server/galaxyweb_interface.py | 140 +++++++--------- 4 files changed, 90 insertions(+), 96 deletions(-) diffs (314 lines): diff -r e600ab3fadc1 -r fe14a58568ad lib/galaxy/web/controllers/requests_admin.py --- a/lib/galaxy/web/controllers/requests_admin.py Wed Apr 21 11:44:19 2010 -0400 +++ b/lib/galaxy/web/controllers/requests_admin.py Wed Apr 21 16:42:09 2010 -0400 @@ -1680,7 +1680,7 @@ virtual_host=trans.app.config.amqp['virtual_host'], insist=False) chan = conn.channel() - msg = amqp.Message(data, + msg = amqp.Message(data.replace('\n', '').replace('\r', ''), content_type='text/plain', application_headers={'msg_type': 'data_transfer'}) msg.properties["delivery_mode"] = 2 diff -r e600ab3fadc1 -r fe14a58568ad scripts/galaxy_messaging/server/amqp_consumer.py --- a/scripts/galaxy_messaging/server/amqp_consumer.py Wed Apr 21 11:44:19 2010 -0400 +++ b/scripts/galaxy_messaging/server/amqp_consumer.py Wed Apr 21 16:42:09 2010 -0400 @@ -37,6 +37,7 @@ log.addHandler(fh) global dbconnstr +global config def get_value(dom, tag_name): ''' @@ -64,17 +65,20 @@ return rc def recv_callback(msg): + global config # check the meesage type. msg_type = msg.properties['application_headers'].get('msg_type') log.debug('\nMESSAGE RECVD: '+str(msg_type)) if msg_type == 'data_transfer': log.debug('DATA TRANSFER') # fork a new process to transfer datasets - transfer_script = "scripts/galaxy_messaging/server/data_transfer.py" - cmd = ( "python", - transfer_script, - msg.body ) - pid = subprocess.Popen(cmd).pid + transfer_script = os.path.join(os.getcwd(), + "scripts/galaxy_messaging/server/data_transfer.py") + cmd = '%s "%s" "%s" "%s"' % ("python", + transfer_script, + msg.body, + config.get("app:main", "id_secret") ) + pid = subprocess.Popen(cmd, shell=True).pid log.debug('Started process (%i): %s' % (pid, str(cmd))) elif msg_type == 'sample_state_update': log.debug('SAMPLE STATE UPDATE') @@ -95,6 +99,7 @@ if len(sys.argv) < 2: print 'Usage: python amqp_consumer.py <Galaxy config file>' return + global config config = ConfigParser.ConfigParser() config.read(sys.argv[1]) global dbconnstr diff -r e600ab3fadc1 -r fe14a58568ad scripts/galaxy_messaging/server/data_transfer.py --- a/scripts/galaxy_messaging/server/data_transfer.py Wed Apr 21 11:44:19 2010 -0400 +++ b/scripts/galaxy_messaging/server/data_transfer.py Wed Apr 21 16:42:09 2010 -0400 @@ -8,7 +8,7 @@ Usage: -python data_transfer.py <data_transfer_xml> +python data_transfer.py <data_transfer_xml> <config_id_secret> """ @@ -57,7 +57,7 @@ class DataTransfer(object): - def __init__(self, msg): + def __init__(self, msg, config_id_secret): log.info(msg) self.dom = xml.dom.minidom.parseString(msg) self.host = self.get_value(self.dom, 'data_host') @@ -67,6 +67,7 @@ self.library_id = self.get_value(self.dom, 'library_id') self.folder_id = self.get_value(self.dom, 'folder_id') self.dataset_files = [] + self.config_id_secret = config_id_secret count=0 while True: index = self.get_value_index(self.dom, 'index', count) @@ -137,7 +138,7 @@ ''' log.error(traceback.format_exc()) log.error('FATAL ERROR.'+msg) - self.update_status('Error.', 'All', msg) + self.update_status('Error', 'All', msg+"\n"+traceback.format_exc()) sys.exit(1) def transfer_files(self): @@ -175,18 +176,24 @@ This method adds the dataset file to the target data library & folder by opening the corresponding url in Galaxy server running. ''' - self.update_status(Sample.transfer_status.ADD_TO_LIBRARY) - galaxyweb = GalaxyWebInterface(self.server_host, self.server_port, - self.datatx_email, self.datatx_password) - galaxyweb.add_to_library(self.server_dir, self.library_id, self.folder_id) - galaxyweb.logout() - + try: + self.update_status(Sample.transfer_status.ADD_TO_LIBRARY) + log.debug("dir:%s, lib:%s, folder:%s" % (self.server_dir, str(self.library_id), str(self.folder_id))) + galaxyweb = GalaxyWebInterface(self.server_host, self.server_port, + self.datatx_email, self.datatx_password, + self.config_id_secret) + galaxyweb.add_to_library(self.server_dir, self.library_id, self.folder_id) + galaxyweb.logout() + except Exception, e: + log.debug(e) + self.error_and_exit(str(e)) + def update_status(self, status, dataset_index='All', msg=''): ''' Update the data transfer status for this dataset in the database ''' try: - log.debug('Setting status "%s" for sample "%s"' % ( status, str(dataset_index) ) ) + log.debug('Setting status "%s" for dataset "%s"' % ( status, str(dataset_index) ) ) df = from_json_string(self.galaxydb.get_sample_dataset_files(self.sample_id)) if dataset_index == 'All': for dataset in self.dataset_files: @@ -240,7 +247,7 @@ # # Start the daemon # - dt = DataTransfer(sys.argv[1]) + dt = DataTransfer(sys.argv[1], sys.argv[2]) dt.start() sys.exit(0) diff -r e600ab3fadc1 -r fe14a58568ad scripts/galaxy_messaging/server/galaxyweb_interface.py --- a/scripts/galaxy_messaging/server/galaxyweb_interface.py Wed Apr 21 11:44:19 2010 -0400 +++ b/scripts/galaxy_messaging/server/galaxyweb_interface.py Wed Apr 21 16:42:09 2010 -0400 @@ -1,6 +1,5 @@ import ConfigParser import sys, os -import serial import array import time import optparse,array @@ -24,97 +23,81 @@ class GalaxyWebInterface(object): - def __init__(self, server_host, server_port, datatx_email, datatx_password): - self.server_host = server_host#config.get("main", "server_host") - self.server_port = server_port#config.get("main", "server_port") - self.datatx_email = datatx_email#config.get("main", "datatx_email") - self.datatx_password = datatx_password#config.get("main", "datatx_password") - try: - # create url - self.base_url = "http://%s:%s" % (self.server_host, self.server_port) - # login - url = "%s/user/login?email=%s&password=%s&login_button=Login" % (self.base_url, self.datatx_email, self.datatx_password) - cj = cookielib.CookieJar() - self.opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj)) - #print url + def __init__(self, server_host, server_port, datatx_email, datatx_password, config_id_secret): + self.server_host = server_host + self.server_port = server_port + self.datatx_email = datatx_email + self.datatx_password = datatx_password + self.config_id_secret = config_id_secret + # create url + self.base_url = "http://%s:%s" % (self.server_host, self.server_port) + # login + url = "%s/user/login?email=%s&password=%s&login_button=Login" % (self.base_url, self.datatx_email, self.datatx_password) + cj = cookielib.CookieJar() + self.opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj)) + #print url + f = self.opener.open(url) + if f.read().find("ogged in as "+self.datatx_email) == -1: + # if the user doesnt exist, create the user + url = "%s/user/create?email=%s&username=%s&password=%s&confirm=%s&create_user_button=Submit" % ( self.base_url, self.datatx_email, self.datatx_email, self.datatx_password, self.datatx_password ) f = self.opener.open(url) if f.read().find("ogged in as "+self.datatx_email) == -1: - # if the user doesnt exist, create the user - url = "%s/user/create?email=%s&username=%s&password=%s&confirm=%s&create_user_button=Submit" % ( self.base_url, self.datatx_email, self.datatx_email, self.datatx_password, self.datatx_password ) - f = self.opener.open(url) - if f.read().find("ogged in as "+self.datatx_email) == -1: - raise "The "+self.datatx_email+" user could not login to Galaxy" - except: - print traceback.format_exc() - sys.exit(1) + raise Exception("The "+self.datatx_email+" user could not login to Galaxy") def add_to_library(self, server_dir, library_id, folder_id, dbkey=''): ''' This method adds the dataset file to the target data library & folder by opening the corresponding url in Galaxy server running. ''' - try: - params = urllib.urlencode(dict( cntrller='library_admin', - tool_id='upload1', - tool_state='None', - library_id=self.encode_id(library_id), - folder_id=self.encode_id(folder_id), - upload_option='upload_directory', - file_type='auto', - server_dir=os.path.basename(server_dir), - dbkey=dbkey, - show_dataset_id='True', - runtool_btn='Upload to library')) - #url = "http://localhost:8080/library_common/upload_library_dataset?cntrller=library_admin&tool_id=upload1&tool_state=None&library_id=adb5f5c93f827949&folder_id=adb5f5c93f827949&upload_option=upload_directory&file_type=auto&server_dir=003&dbkey=%3F&message=&runtool_btn=Upload+to+library" - #url = base_url+"/library_common/upload_library_dataset?library_id=adb5f5c93f827949&tool_id=upload1&file_type=auto&server_dir=datatx_22858&dbkey=%3F&upload_option=upload_directory&folder_id=529fd61ab1c6cc36&cntrller=library_admin&tool_state=None&runtool_btn=Upload+to+library" - url = self.base_url+"/library_common/upload_library_dataset" - #print url - #print params - f = self.opener.open(url, params) - if f.read().find("Data Library") == -1: - raise "Dataset could not be uploaded to the data library" - except: - print traceback.format_exc() - sys.exit(1) + params = urllib.urlencode(dict( cntrller='library_admin', + tool_id='upload1', + tool_state='None', + library_id=self.encode_id(library_id), + folder_id=self.encode_id(folder_id), + upload_option='upload_directory', + file_type='auto', + server_dir=os.path.basename(server_dir), + dbkey=dbkey, + show_dataset_id='True', + runtool_btn='Upload to library')) + #url = "http://localhost:8080/library_common/upload_library_dataset?cntrller=library_admin&tool_id=upload1&tool_state=None&library_id=adb5f5c93f827949&folder_id=adb5f5c93f827949&upload_option=upload_directory&file_type=auto&server_dir=003&dbkey=%3F&message=&runtool_btn=Upload+to+library" + #url = base_url+"/library_common/upload_library_dataset?library_id=adb5f5c93f827949&tool_id=upload1&file_type=auto&server_dir=datatx_22858&dbkey=%3F&upload_option=upload_directory&folder_id=529fd61ab1c6cc36&cntrller=library_admin&tool_state=None&runtool_btn=Upload+to+library" + url = self.base_url+"/library_common/upload_library_dataset" + #print url + #print params + f = self.opener.open(url, params) + if f.read().find("Data Library") == -1: + raise Exception("Dataset could not be uploaded to the data library. URL: %s, PARAMS=%s" % (url, params)) def import_to_history(self, ldda_id, library_id, folder_id): - try: - params = urllib.urlencode(dict( cntrller='library_admin', - show_deleted='False', - library_id=self.encode_id(library_id), - folder_id=self.encode_id(folder_id), - ldda_ids=self.encode_id(ldda_id), - do_action='import_to_history', - use_panels='False')) - #url = "http://lion.bx.psu.edu:8080/library_common/act_on_multiple_datasets?library_id=adb5f5c93f827949&show_deleted=False&ldda_ids=adb5f5c93f827949&cntrller=library_admin&do_action=import_to_history&use_panels=False" - #url = base_url+"/library_common/upload_library_dataset?library_id=adb5f5c93f827949&tool_id=upload1&file_type=auto&server_dir=datatx_22858&dbkey=%3F&upload_option=upload_directory&folder_id=529fd61ab1c6cc36&cntrller=library_admin&tool_state=None&runtool_btn=Upload+to+library" - url = self.base_url+"/library_common/act_on_multiple_datasets" - #print url - #print params - f = self.opener.open(url, params) - x = f.read() - if x.find("1 dataset(s) have been imported into your history.") == -1: - #print x - raise Exception("Dataset could not be imported into history") - except: - print traceback.format_exc() - sys.exit(1) - + params = urllib.urlencode(dict( cntrller='library_admin', + show_deleted='False', + library_id=self.encode_id(library_id), + folder_id=self.encode_id(folder_id), + ldda_ids=self.encode_id(ldda_id), + do_action='import_to_history', + use_panels='False')) + #url = "http://lion.bx.psu.edu:8080/library_common/act_on_multiple_datasets?library_id=adb5f5c93f827949&show_deleted=False&ldda_ids=adb5f5c93f827949&cntrller=library_admin&do_action=import_to_history&use_panels=False" + #url = base_url+"/library_common/upload_library_dataset?library_id=adb5f5c93f827949&tool_id=upload1&file_type=auto&server_dir=datatx_22858&dbkey=%3F&upload_option=upload_directory&folder_id=529fd61ab1c6cc36&cntrller=library_admin&tool_state=None&runtool_btn=Upload+to+library" + url = self.base_url+"/library_common/act_on_multiple_datasets" + #print url + #print params + f = self.opener.open(url, params) + x = f.read() + if x.find("1 dataset(s) have been imported into your history.") == -1: + #print x + raise Exception("Dataset could not be imported into history") def run_workflow(self, workflow_id, hid, workflow_step): input = str(workflow_step)+'|input' - try: - params = urllib.urlencode({'id':self.encode_id(workflow_id), - 'run_workflow': 'Run workflow', - input: hid}) - url = self.base_url+"/workflow/run" - #print url+'?'+params - f = self.opener.open(url, params) + params = urllib.urlencode({'id':self.encode_id(workflow_id), + 'run_workflow': 'Run workflow', + input: hid}) + url = self.base_url+"/workflow/run" + #print url+'?'+params + f = self.opener.open(url, params) # if f.read().find("1 dataset(s) have been imported into your history.") == -1: # raise Exception("Error in running the workflow") - except: - print traceback.format_exc() - sys.exit(1) def logout(self): @@ -122,8 +105,7 @@ f = self.opener.open(self.base_url+'/user/logout') def encode_id(self, obj_id ): - id_secret = 'changethisinproductiontoo' - id_cipher = Blowfish.new( id_secret ) + id_cipher = Blowfish.new( self.config_id_secret ) # Convert to string s = str( obj_id ) # Pad to a multiple of 8 with leading "!"