details: http://www.bx.psu.edu/hg/galaxy/rev/076f572d7c9d changeset: 3674:076f572d7c9d user: rc date: Wed Apr 21 10:41:30 2010 -0400 description: lims: - data transfer now uses rabbitmq - datasets can now be renamed before transfering from the sequencer - data transfer code refactored diffstat: lib/galaxy/config.py | 4 + lib/galaxy/model/__init__.py | 28 +- lib/galaxy/web/controllers/requests_admin.py | 121 +++++++- lib/galaxy/web/framework/__init__.py | 1 + run_galaxy_listener.sh | 2 +- scripts/galaxy_messaging/client/amqp_publisher.py | 4 +- scripts/galaxy_messaging/server/amqp_consumer.py | 66 +++- scripts/galaxy_messaging/server/data_transfer.py | 241 +++++++++------- scripts/galaxy_messaging/server/galaxydb_interface.py | 17 +- scripts/galaxy_messaging/server/galaxyweb_interface.py | 132 +++++++++ templates/admin/requests/dataset.mako | 71 +++++ templates/admin/requests/get_data.mako | 67 ++- universe_wsgi.ini.sample | 2 +- 13 files changed, 577 insertions(+), 179 deletions(-) diffs (1118 lines): diff -r 207d0d70483b -r 076f572d7c9d lib/galaxy/config.py --- a/lib/galaxy/config.py Tue Apr 20 15:36:03 2010 -0400 +++ b/lib/galaxy/config.py Wed Apr 21 10:41:30 2010 -0400 @@ -123,6 +123,10 @@ self.enable_cloud_execution = string_as_bool( kwargs.get( 'enable_cloud_execution', 'True' ) ) else: self.enable_cloud_execution = string_as_bool( kwargs.get( 'enable_cloud_execution', 'False' ) ) + # Galaxy messaging (AMQP) configuration options + self.amqp = {} + for k, v in global_conf_parser.items("galaxy_amqp"): + self.amqp[k] = v def get( self, key, default ): return self.config_dict.get( key, default ) def get_bool( self, key, default ): diff -r 207d0d70483b -r 076f572d7c9d lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py Tue Apr 20 15:36:03 2010 -0400 +++ b/lib/galaxy/model/__init__.py Wed Apr 21 10:41:30 2010 -0400 @@ -18,6 +18,7 @@ import logging log = logging.getLogger( __name__ ) from sqlalchemy.orm import object_session +import pexpect datatypes_registry = galaxy.datatypes.registry.Registry() #Default Value Required for unit tests @@ -1455,7 +1456,9 @@ class Sample( object ): transfer_status = Bunch( NOT_STARTED = 'Not started', - IN_PROGRESS = 'In progress', + IN_QUEUE = 'In queue', + TRANSFERRING = 'Transferring dataset', + ADD_TO_LIBRARY = 'Adding to data library', COMPLETE = 'Complete', ERROR = 'Error') def __init__(self, name=None, desc=None, request=None, form_values=None, @@ -1474,22 +1477,33 @@ return None def untransferred_dataset_files(self): count = 0 - for df, status in self.dataset_files: - if status == self.transfer_status.NOT_STARTED: + for df in self.dataset_files: + if df['status'] == self.transfer_status.NOT_STARTED: count = count + 1 return count def inprogress_dataset_files(self): count = 0 - for df, status in self.dataset_files: - if status == self.transfer_status.IN_PROGRESS: + for df in self.dataset_files: + if df['status'] not in [self.transfer_status.NOT_STARTED, self.transfer_status.COMPLETE]: count = count + 1 return count def transferred_dataset_files(self): count = 0 - for df, status in self.dataset_files: - if status == self.transfer_status.COMPLETE: + for df in self.dataset_files: + if df['status'] == self.transfer_status.COMPLETE: count = count + 1 return count + def dataset_size(self, filepath): + def print_ticks(d): + pass + datatx_info = self.request.type.datatx_info + cmd = 'ssh %s@%s "du -sh %s"' % ( datatx_info['username'], + datatx_info['host'], + filepath) + output = pexpect.run(cmd, events={'.ssword:*': datatx_info['password']+'\r\n', + pexpect.TIMEOUT:print_ticks}, + timeout=10) + return output.split('\t')[0] class SampleState( object ): def __init__(self, name=None, desc=None, request_type=None): diff -r 207d0d70483b -r 076f572d7c9d lib/galaxy/web/controllers/requests_admin.py --- a/lib/galaxy/web/controllers/requests_admin.py Tue Apr 20 15:36:03 2010 -0400 +++ b/lib/galaxy/web/controllers/requests_admin.py Wed Apr 21 10:41:30 2010 -0400 @@ -12,6 +12,7 @@ from sqlalchemy.sql import select import pexpect import ConfigParser, threading, time +from amqplib import client_0_8 as amqp log = logging.getLogger( __name__ ) @@ -64,7 +65,6 @@ .filter( self.event_class.table.c.id.in_(select(columns=[func.max(self.event_class.table.c.id)], from_obj=self.event_class.table, group_by=self.event_class.table.c.request_id))) - #print column_filter, q return q def get_accepted_filters( self ): """ Returns a list of accepted filters for this column. """ @@ -1509,8 +1509,11 @@ params = util.Params( kwd ) message = util.restore_text( params.get( 'message', '' ) ) status = params.get( 'status', 'done' ) - folder_path = util.restore_text( params.get( 'folder_path', '' ) ) + folder_path = util.restore_text( params.get( 'folder_path', + sample.request.type.datatx_info['data_dir'] ) ) files_list = util.listify( params.get( 'files_list', '' ) ) + if params.get( 'start_transfer_button', False ) == 'True': + return self.__start_datatx(trans, sample) if not folder_path: return trans.fill_template( '/admin/requests/get_data.mako', sample=sample, files=[], @@ -1544,32 +1547,43 @@ dataset_files=sample.dataset_files, folder_path=folder_path ) elif params.get( 'remove_dataset_button', False ): + # get the filenames from the remote host + files = self.__get_files(trans, sample, folder_path) dataset_index = int(params.get( 'dataset_index', 0 )) del sample.dataset_files[dataset_index] trans.sa_session.add( sample ) trans.sa_session.flush() return trans.fill_template( '/admin/requests/get_data.mako', - sample=sample, - dataset_files=sample.dataset_files) - elif params.get( 'start_transfer_button', False ): + sample=sample, files=files, + dataset_files=sample.dataset_files, + folder_path=folder_path) + elif params.get( 'select_files_button', False ): folder_files = [] if len(files_list): for f in files_list: + filepath = os.path.join(folder_path, f) if f[-1] == os.sep: # the selected item is a folder so transfer all the # folder contents - self.__get_files_in_dir(trans, sample, os.path.join(folder_path, f)) + self.__get_files_in_dir(trans, sample, filepath) else: - sample.dataset_files.append([os.path.join(folder_path, f), - sample.transfer_status.NOT_STARTED]) + sample.dataset_files.append(dict(filepath=filepath, + status=sample.transfer_status.NOT_STARTED, + name=filepath.split('/')[-1], + error_msg='', + size=sample.dataset_size(filepath))) trans.sa_session.add( sample ) trans.sa_session.flush() - return self.__start_datatx(trans, sample) return trans.response.send_redirect( web.url_for( controller='requests_admin', action='show_datatx_page', sample_id=trans.security.encode_id(sample.id), folder_path=folder_path)) + return trans.response.send_redirect( web.url_for( controller='requests_admin', + action='show_datatx_page', + sample_id=trans.security.encode_id(sample.id), + folder_path=folder_path)) + def __setup_datatx_user(self, trans, library, folder): ''' This method sets up the datatx user: @@ -1620,7 +1634,62 @@ trans.sa_session.add( dp ) trans.sa_session.flush() return datatx_user - + + def __send_message(self, trans, datatx_info, sample): + ''' + This method creates the xml message and sends it to the rabbitmq server + ''' + # first create the xml message based on the following template + xml = \ + ''' <data_transfer> + <data_host>%(DATA_HOST)s</data_host> + <data_user>%(DATA_USER)s</data_user> + <data_password>%(DATA_PASSWORD)s</data_password> + <sample_id>%(SAMPLE_ID)s</sample_id> + <library_id>%(LIBRARY_ID)s</library_id> + <folder_id>%(FOLDER_ID)s</folder_id> + %(DATASETS)s + </data_transfer>''' + dataset_xml = \ + '''<dataset> + <index>%(INDEX)s</index> + <name>%(NAME)s</name> + <file>%(FILE)s</file> + </dataset>''' + datasets = '' + for index, dataset in enumerate(sample.dataset_files): + if dataset['status'] == sample.transfer_status.NOT_STARTED: + datasets = datasets + dataset_xml % dict(INDEX=str(index), + NAME=dataset['name'], + FILE=dataset['filepath']) + sample.dataset_files[index]['status'] = sample.transfer_status.IN_QUEUE + + trans.sa_session.add( sample ) + trans.sa_session.flush() + data = xml % dict(DATA_HOST=datatx_info['host'], + DATA_USER=datatx_info['username'], + DATA_PASSWORD=datatx_info['password'], + SAMPLE_ID=str(sample.id), + LIBRARY_ID=str(sample.library.id), + FOLDER_ID=str(sample.folder.id), + DATASETS=datasets) + # now send this message + conn = amqp.Connection(host=trans.app.config.amqp['host']+":"+trans.app.config.amqp['port'], + userid=trans.app.config.amqp['userid'], + password=trans.app.config.amqp['password'], + virtual_host=trans.app.config.amqp['virtual_host'], + insist=False) + chan = conn.channel() + msg = amqp.Message(data, + content_type='text/plain', + application_headers={'msg_type': 'data_transfer'}) + msg.properties["delivery_mode"] = 2 + chan.basic_publish(msg, + exchange=trans.app.config.amqp['exchange'], + routing_key=trans.app.config.amqp['routing_key']) + chan.close() + conn.close() + def __start_datatx(self, trans, sample): # data transfer user datatx_user = self.__setup_datatx_user(trans, sample.library, sample.folder) @@ -1635,6 +1704,11 @@ sample_id=trans.security.encode_id(sample.id), status='error', message=message)) + self.__send_message(trans, datatx_info, sample) + return trans.response.send_redirect( web.url_for( controller='requests_admin', + action='show_datatx_page', + sample_id=trans.security.encode_id(sample.id), + folder_path=datatx_info['data_dir'])) error_message = '' transfer_script = "scripts/galaxy_messaging/server/data_transfer.py" for index, dataset in enumerate(sample.dataset_files): @@ -1670,6 +1744,33 @@ action='show_datatx_page', sample_id=trans.security.encode_id(sample.id), folder_path=os.path.dirname(dfile))) + + @web.expose + @web.require_admin + def dataset_details( self, trans, **kwd ): + try: + sample = trans.sa_session.query( trans.app.model.Sample ).get( trans.security.decode_id(kwd['sample_id']) ) + except: + return trans.response.send_redirect( web.url_for( controller='requests_admin', + action='list', + status='error', + message="Invalid sample ID" ) ) + params = util.Params( kwd ) + message = util.restore_text( params.get( 'message', '' ) ) + status = params.get( 'status', 'done' ) + dataset_index = int( params.get( 'dataset_index', '' ) ) + if params.get('save', '') == 'Save': + sample.dataset_files[dataset_index]['name'] = util.restore_text( params.get( 'name', + sample.dataset_files[dataset_index]['name'] ) ) + trans.sa_session.add( sample ) + trans.sa_session.flush() + status = 'done' + message = 'Saved the changes made to the dataset.' + return trans.fill_template( '/admin/requests/dataset.mako', + sample=sample, + dataset_index=dataset_index, + message=message, + status=status) ## #### Request Type Stuff ################################################### ## diff -r 207d0d70483b -r 076f572d7c9d lib/galaxy/web/framework/__init__.py --- a/lib/galaxy/web/framework/__init__.py Tue Apr 20 15:36:03 2010 -0400 +++ b/lib/galaxy/web/framework/__init__.py Wed Apr 21 10:41:30 2010 -0400 @@ -32,6 +32,7 @@ from sqlalchemy import and_ pkg_resources.require( "pexpect" ) +pkg_resources.require( "amqplib" ) import logging log = logging.getLogger( __name__ ) diff -r 207d0d70483b -r 076f572d7c9d run_galaxy_listener.sh --- a/run_galaxy_listener.sh Tue Apr 20 15:36:03 2010 -0400 +++ b/run_galaxy_listener.sh Wed Apr 21 10:41:30 2010 -0400 @@ -1,4 +1,4 @@ #!/bin/sh cd `dirname $0` -python scripts/galaxy_messaging/server/amqp_consumer.py universe_wsgi.ini >> galaxy_listener.log 2>&1 \ No newline at end of file +python scripts/galaxy_messaging/server/amqp_consumer.py universe_wsgi.ini 2>&1 \ No newline at end of file diff -r 207d0d70483b -r 076f572d7c9d scripts/galaxy_messaging/client/amqp_publisher.py --- a/scripts/galaxy_messaging/client/amqp_publisher.py Tue Apr 20 15:36:03 2010 -0400 +++ b/scripts/galaxy_messaging/client/amqp_publisher.py Wed Apr 21 10:41:30 2010 -0400 @@ -35,7 +35,9 @@ virtual_host=amqp_config['virtual_host'], insist=False) chan = conn.channel() - msg = amqp.Message(data) + msg = amqp.Message(data, + content_type='text/plain', + application_headers={'msg_type': 'sample_state_update'}) msg.properties["delivery_mode"] = 2 chan.basic_publish(msg, exchange=amqp_config['exchange'], diff -r 207d0d70483b -r 076f572d7c9d scripts/galaxy_messaging/server/amqp_consumer.py --- a/scripts/galaxy_messaging/server/amqp_consumer.py Tue Apr 20 15:36:03 2010 -0400 +++ b/scripts/galaxy_messaging/server/amqp_consumer.py Wed Apr 21 10:41:30 2010 -0400 @@ -13,6 +13,7 @@ import sys, os import optparse import xml.dom.minidom +import subprocess from galaxydb_interface import GalaxyDbInterface assert sys.version_info[:2] >= ( 2, 4 ) @@ -27,8 +28,13 @@ from amqplib import client_0_8 as amqp import logging -logging.basicConfig(level=logging.DEBUG) -log = logging.getLogger( 'GalaxyAMQP' ) +log = logging.getLogger("GalaxyAMQP") +log.setLevel(logging.DEBUG) +fh = logging.FileHandler("galaxy_listener.log") +fh.setLevel(logging.DEBUG) +formatter = logging.Formatter("%(asctime)s - %(name)s - %(message)s") +fh.setFormatter(formatter) +log.addHandler(fh) global dbconnstr @@ -43,19 +49,47 @@ rc = rc + node.data return rc +def get_value_index(dom, tag_name, index): + ''' + This method extracts the tag value from the xml message + ''' + try: + nodelist = dom.getElementsByTagName(tag_name)[index].childNodes + except: + return None + rc = "" + for node in nodelist: + if node.nodeType == node.TEXT_NODE: + rc = rc + node.data + return rc + def recv_callback(msg): - dom = xml.dom.minidom.parseString(msg.body) - barcode = get_value(dom, 'barcode') - state = get_value(dom, 'state') - log.debug('Barcode: '+barcode) - log.debug('State: '+state) - # update the galaxy db - galaxy = GalaxyDbInterface(dbconnstr) - sample_id = galaxy.get_sample_id(field_name='bar_code', value=barcode) - if sample_id == -1: - log.debug('Invalid barcode.') - return - galaxy.change_state(sample_id, state) + # check the meesage type. + msg_type = msg.properties['application_headers'].get('msg_type') + log.debug('\nMESSAGE RECVD: '+str(msg_type)) + if msg_type == 'data_transfer': + log.debug('DATA TRANSFER') + # fork a new process to transfer datasets + transfer_script = "scripts/galaxy_messaging/server/data_transfer.py" + cmd = ( "python", + transfer_script, + msg.body ) + pid = subprocess.Popen(cmd).pid + log.debug('Started process (%i): %s' % (pid, str(cmd))) + elif msg_type == 'sample_state_update': + log.debug('SAMPLE STATE UPDATE') + dom = xml.dom.minidom.parseString(msg.body) + barcode = get_value(dom, 'barcode') + state = get_value(dom, 'state') + log.debug('Barcode: '+barcode) + log.debug('State: '+state) + # update the galaxy db + galaxy = GalaxyDbInterface(dbconnstr) + sample_id = galaxy.get_sample_id(field_name='bar_code', value=barcode) + if sample_id == -1: + log.debug('Invalid barcode.') + return + galaxy.change_state(sample_id, state) def main(): if len(sys.argv) < 2: @@ -66,8 +100,8 @@ global dbconnstr dbconnstr = config.get("app:main", "database_connection") amqp_config = {} - for option in config.options("galaxy:amqp"): - amqp_config[option] = config.get("galaxy:amqp", option) + for option in config.options("galaxy_amqp"): + amqp_config[option] = config.get("galaxy_amqp", option) log.debug(str(amqp_config)) conn = amqp.Connection(host=amqp_config['host']+":"+amqp_config['port'], userid=amqp_config['userid'], diff -r 207d0d70483b -r 076f572d7c9d scripts/galaxy_messaging/server/data_transfer.py --- a/scripts/galaxy_messaging/server/data_transfer.py Tue Apr 20 15:36:03 2010 -0400 +++ b/scripts/galaxy_messaging/server/data_transfer.py Wed Apr 21 10:41:30 2010 -0400 @@ -8,28 +8,36 @@ Usage: -python data_transfer.py <sequencer_host> - <username> - <password> - <source_file> - <sample_id> - <dataset_index> - <library_id> - <folder_id> +python data_transfer.py <data_transfer_xml> + + """ import ConfigParser import sys, os, time, traceback import optparse import urllib,urllib2, cookielib, shutil import logging, time +import xml.dom.minidom + +sp = sys.path[0] + from galaxydb_interface import GalaxyDbInterface assert sys.version_info[:2] >= ( 2, 4 ) +new_path = [ sp ] +new_path.extend( sys.path ) +sys.path = new_path + +from galaxyweb_interface import GalaxyWebInterface + +assert sys.version_info[:2] >= ( 2, 4 ) new_path = [ os.path.join( os.getcwd(), "lib" ) ] new_path.extend( sys.path[1:] ) # remove scripts/ from the path sys.path = new_path + from galaxy.util.json import from_json_string, to_json_string +from galaxy.model import Sample from galaxy import eggs import pkg_resources pkg_resources.require( "pexpect" ) @@ -38,28 +46,39 @@ pkg_resources.require( "simplejson" ) import simplejson -logging.basicConfig(filename=sys.stderr, level=logging.DEBUG, - format="%(asctime)s [%(levelname)s] %(message)s") - -class DataTransferException(Exception): - def __init__(self, value): - self.msg = value - def __str__(self): - return repr(self.msg) +log = logging.getLogger("datatx_"+str(os.getpid())) +log.setLevel(logging.DEBUG) +fh = logging.FileHandler("data_transfer.log") +fh.setLevel(logging.DEBUG) +formatter = logging.Formatter("%(asctime)s - %(name)s - %(message)s") +fh.setFormatter(formatter) +log.addHandler(fh) class DataTransfer(object): - def __init__(self, host, username, password, remote_file, sample_id, - dataset_index, library_id, folder_id): - self.host = host - self.username = username - self.password = password - self.remote_file = remote_file - self.sample_id = sample_id - self.dataset_index = dataset_index - self.library_id = library_id - self.folder_id = folder_id + def __init__(self, msg): + log.info(msg) + self.dom = xml.dom.minidom.parseString(msg) + self.host = self.get_value(self.dom, 'data_host') + self.username = self.get_value(self.dom, 'data_user') + self.password = self.get_value(self.dom, 'data_password') + self.sample_id = self.get_value(self.dom, 'sample_id') + self.library_id = self.get_value(self.dom, 'library_id') + self.folder_id = self.get_value(self.dom, 'folder_id') + self.dataset_files = [] + count=0 + while True: + index = self.get_value_index(self.dom, 'index', count) + file = self.get_value_index(self.dom, 'file', count) + name = self.get_value_index(self.dom, 'name', count) + if file: + self.dataset_files.append(dict(name=name, + index=int(index), + file=file)) + else: + break + count=count+1 try: # Retrieve the upload user login information from the config file config = ConfigParser.ConfigParser() @@ -75,11 +94,13 @@ os.mkdir(self.server_dir) if not os.path.exists(self.server_dir): raise Exception + # connect to db + self.galaxydb = GalaxyDbInterface(self.database_connection) except: - logging.error(traceback.format_exc()) - logging.error('FATAL ERROR') + log.error(traceback.format_exc()) + log.error('FATAL ERROR') if self.database_connection: - self.update_status('Error') + self.error_and_exit('Error') sys.exit(1) def start(self): @@ -88,13 +109,13 @@ to the data library & finally updates the data transfer status in the db ''' # datatx - self.transfer_file() + self.transfer_files() # add the dataset to the given library self.add_to_library() # update the data transfer status in the db - self.update_status('Complete') + self.update_status(Sample.transfer_status.COMPLETE) # cleanup - self.cleanup() + #self.cleanup() sys.exit(0) def cleanup(self): @@ -114,34 +135,39 @@ This method is called any exception is raised. This prints the traceback and terminates this script ''' - logging.error(traceback.format_exc()) - logging.error('FATAL ERROR.'+msg) - self.update_status('Error.'+msg) + log.error(traceback.format_exc()) + log.error('FATAL ERROR.'+msg) + self.update_status('Error.', 'All', msg) sys.exit(1) - def transfer_file(self): + def transfer_files(self): ''' This method executes a scp process using pexpect library to transfer the dataset file from the remote sequencer to the Galaxy server ''' def print_ticks(d): pass - try: - cmd = "scp %s@%s:%s %s" % ( self.username, - self.host, - self.remote_file, - self.server_dir) - logging.debug(cmd) - output = pexpect.run(cmd, events={'.ssword:*': self.password+'\r\n', - pexpect.TIMEOUT:print_ticks}, - timeout=10) - logging.debug(output) - if not os.path.exists(os.path.join(self.server_dir, os.path.basename(self.remote_file))): - raise DataTransferException('Could not find the local file after transfer (%s)' % os.path.join(self.server_dir, os.path.basename(self.remote_file))) - except DataTransferException, (e): - self.error_and_exit(e.msg) - except: - self.error_and_exit() + for i, df in enumerate(self.dataset_files): + self.update_status(Sample.transfer_status.TRANSFERRING, df['index']) + try: + cmd = "scp %s@%s:%s %s/%s" % ( self.username, + self.host, + df['file'], + self.server_dir, + df['name']) + log.debug(cmd) + output = pexpect.run(cmd, events={'.ssword:*': self.password+'\r\n', + pexpect.TIMEOUT:print_ticks}, + timeout=10) + log.debug(output) + path = os.path.join(self.server_dir, os.path.basename(df['file'])) + if not os.path.exists(path): + msg = 'Could not find the local file after transfer (%s)' % path + log.error(msg) + raise Exception(msg) + except Exception, e: + msg = traceback.format_exc() + self.update_status('Error', df['index'], msg) def add_to_library(self): @@ -149,73 +175,72 @@ This method adds the dataset file to the target data library & folder by opening the corresponding url in Galaxy server running. ''' - try: - logging.debug('Adding %s to library...' % os.path.basename(self.remote_file)) - # create url - base_url = "http://%s:%s" % (self.server_host, self.server_port) - # login - url = "%s/user/login?email=%s&password=%s" % (base_url, self.datatx_email, self.datatx_password) - cj = cookielib.CookieJar() - opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj)) - f = opener.open(url) - if f.read().find("ogged in as "+self.datatx_email) == -1: - # if the user doesnt exist, create the user - url = "%s/user/create?email=%s&username=%s&password=%s&confirm=%s&create_user_button=Submit" % ( base_url, self.datatx_email, self.datatx_email, self.datatx_password, self.datatx_password ) - f = opener.open(url) - if f.read().find("ogged in as "+self.datatx_email) == -1: - raise DataTransferException("The "+self.datatx_email+" user could not login to Galaxy") - # after login, add dataset to the library - params = urllib.urlencode(dict( cntrller='library_admin', - tool_id='upload1', - tool_state='None', - library_id=self.library_id, - folder_id=self.folder_id, - upload_option='upload_directory', - file_type='auto', - server_dir=os.path.basename(self.server_dir), - dbkey='', - runtool_btn='Upload to library')) - #url = "http://localhost:8080/library_common/upload_library_dataset?cntrller=library_admin&tool_id=upload1&tool_state=None&library_id=adb5f5c93f827949&folder_id=adb5f5c93f827949&upload_option=upload_directory&file_type=auto&server_dir=003&dbkey=%3F&message=&runtool_btn=Upload+to+library" - #url = base_url+"/library_common/upload_library_dataset?library_id=adb5f5c93f827949&tool_id=upload1&file_type=auto&server_dir=datatx_22858&dbkey=%3F&upload_option=upload_directory&folder_id=529fd61ab1c6cc36&cntrller=library_admin&tool_state=None&runtool_btn=Upload+to+library" - url = base_url+"/library_common/upload_library_dataset" - logging.debug(url) - logging.debug(params) - f = opener.open(url, params) - if f.read().find("Data Library") == -1: - raise DataTransferException("Dataset could not be uploaded to the data library") - # finally logout - f = opener.open(base_url+'/user/logout') - if f.read().find("You have been logged out.") == -1: - raise DataTransferException("The "+self.datatx_email+" user could not logout of Galaxy") - except DataTransferException, (e): - self.error_and_exit(e.msg) - except: - self.error_and_exit() + self.update_status(Sample.transfer_status.ADD_TO_LIBRARY) + galaxyweb = GalaxyWebInterface(self.server_host, self.server_port, + self.datatx_email, self.datatx_password) + galaxyweb.add_to_library(self.server_dir, self.library_id, self.folder_id) + galaxyweb.logout() - def update_status(self, status): + def update_status(self, status, dataset_index='All', msg=''): ''' Update the data transfer status for this dataset in the database ''' try: - galaxy = GalaxyDbInterface(self.database_connection) - df = from_json_string(galaxy.get_sample_dataset_files(self.sample_id)) - logging.debug(df) - df[self.dataset_index][1] = status - galaxy.set_sample_dataset_files(self.sample_id, to_json_string(df)) - logging.debug("######################\n"+str(from_json_string(galaxy.get_sample_dataset_files(self.sample_id))[self.dataset_index])) + log.debug('Setting status "%s" for sample "%s"' % ( status, str(dataset_index) ) ) + df = from_json_string(self.galaxydb.get_sample_dataset_files(self.sample_id)) + if dataset_index == 'All': + for dataset in self.dataset_files: + df[dataset['index']]['status'] = status + if status == 'Error': + df[dataset['index']]['error_msg'] = msg + else: + df[dataset['index']]['error_msg'] = '' + + else: + df[dataset_index]['status'] = status + if status == 'Error': + df[dataset_index]['error_msg'] = msg + else: + df[dataset_index]['error_msg'] = '' + + self.galaxydb.set_sample_dataset_files(self.sample_id, to_json_string(df)) + log.debug('done.') except: - logging.error(traceback.format_exc()) - logging.error('FATAL ERROR') + log.error(traceback.format_exc()) + log.error('FATAL ERROR') sys.exit(1) + + def get_value(self, dom, tag_name): + ''' + This method extracts the tag value from the xml message + ''' + nodelist = dom.getElementsByTagName(tag_name)[0].childNodes + rc = "" + for node in nodelist: + if node.nodeType == node.TEXT_NODE: + rc = rc + node.data + return rc + + def get_value_index(self, dom, tag_name, index): + ''' + This method extracts the tag value from the xml message + ''' + try: + nodelist = dom.getElementsByTagName(tag_name)[index].childNodes + except: + return None + rc = "" + for node in nodelist: + if node.nodeType == node.TEXT_NODE: + rc = rc + node.data + return rc if __name__ == '__main__': - logging.info('STARTING %i %s' % (os.getpid(), str(sys.argv))) - logging.info('daemonized %i' % os.getpid()) + log.info('STARTING %i %s' % (os.getpid(), str(sys.argv))) # # Start the daemon - # - dt = DataTransfer(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4], - int(sys.argv[5]), int(sys.argv[6]), sys.argv[7], sys.argv[8]) + # + dt = DataTransfer(sys.argv[1]) dt.start() sys.exit(0) diff -r 207d0d70483b -r 076f572d7c9d scripts/galaxy_messaging/server/galaxydb_interface.py --- a/scripts/galaxy_messaging/server/galaxydb_interface.py Tue Apr 20 15:36:03 2010 -0400 +++ b/scripts/galaxy_messaging/server/galaxydb_interface.py Wed Apr 21 10:41:30 2010 -0400 @@ -20,8 +20,8 @@ from sqlalchemy import * from sqlalchemy.orm import sessionmaker -logging.basicConfig(level=logging.DEBUG) -log = logging.getLogger( 'GalaxyDbInterface' ) +#logging.basicConfig(level=logging.DEBUG) +#log = logging.getLogger( 'GalaxyDbInterface' ) class GalaxyDbInterface(object): @@ -53,9 +53,8 @@ x = result.fetchone() if x: sample_id = x[0] - log.debug('Sample ID: %i' % sample_id) + #log.debug('Sample ID: %i' % sample_id) return sample_id - log.warning('This sample %s %s does not belong to any sample in the database.' % (field_name, value)) return -1 def current_state(self, sample_id): @@ -74,16 +73,16 @@ subsubquery = select(columns=[self.sample_table.c.request_id], whereclause=self.sample_table.c.id==sample_id) self.request_id = subsubquery.execute().fetchall()[0][0] - log.debug('REQUESTID: %i' % self.request_id) + #log.debug('REQUESTID: %i' % self.request_id) subquery = select(columns=[self.request_table.c.request_type_id], whereclause=self.request_table.c.id==self.request_id) request_type_id = subquery.execute().fetchall()[0][0] - log.debug('REQUESTTYPEID: %i' % request_type_id) + #log.debug('REQUESTTYPEID: %i' % request_type_id) query = select(columns=[self.state_table.c.id, self.state_table.c.name], whereclause=self.state_table.c.request_type_id==request_type_id, order_by=self.state_table.c.id.asc()) states = query.execute().fetchall() - log.debug('POSSIBLESTATES: '+ str(states)) + #log.debug('POSSIBLESTATES: '+ str(states)) return states def change_state(self, sample_id, new_state=None): @@ -100,7 +99,7 @@ new_state_id = state_id if new_state_id == -1: return - log.debug('Updating sample_id %i state to %s' % (sample_id, new_state)) + #log.debug('Updating sample_id %i state to %s' % (sample_id, new_state)) i = self.event_table.insert() i.execute(update_time=datetime.utcnow(), create_time=datetime.utcnow(), @@ -120,7 +119,7 @@ break if request_complete: request_state = 'Complete' - log.debug('Updating request_id %i state to "%s"' % (self.request_id, request_state)) + #log.debug('Updating request_id %i state to "%s"' % (self.request_id, request_state)) i = self.request_event_table.insert() i.execute(update_time=datetime.utcnow(), create_time=datetime.utcnow(), diff -r 207d0d70483b -r 076f572d7c9d scripts/galaxy_messaging/server/galaxyweb_interface.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/scripts/galaxy_messaging/server/galaxyweb_interface.py Wed Apr 21 10:41:30 2010 -0400 @@ -0,0 +1,132 @@ +import ConfigParser +import sys, os +import serial +import array +import time +import optparse,array +import shutil, traceback +import urllib,urllib2, cookielib + +assert sys.version_info[:2] >= ( 2, 4 ) +new_path = [ os.path.join( os.getcwd(), "lib" ) ] +new_path.extend( sys.path[1:] ) # remove scripts/ from the path +sys.path = new_path + +from galaxy import eggs +import pkg_resources + +import pkg_resources +pkg_resources.require( "pycrypto" ) + +from Crypto.Cipher import Blowfish +from Crypto.Util.randpool import RandomPool +from Crypto.Util import number + + +class GalaxyWebInterface(object): + def __init__(self, server_host, server_port, datatx_email, datatx_password): + self.server_host = server_host#config.get("main", "server_host") + self.server_port = server_port#config.get("main", "server_port") + self.datatx_email = datatx_email#config.get("main", "datatx_email") + self.datatx_password = datatx_password#config.get("main", "datatx_password") + try: + # create url + self.base_url = "http://%s:%s" % (self.server_host, self.server_port) + # login + url = "%s/user/login?email=%s&password=%s&login_button=Login" % (self.base_url, self.datatx_email, self.datatx_password) + cj = cookielib.CookieJar() + self.opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj)) + #print url + f = self.opener.open(url) + if f.read().find("ogged in as "+self.datatx_email) == -1: + # if the user doesnt exist, create the user + url = "%s/user/create?email=%s&username=%s&password=%s&confirm=%s&create_user_button=Submit" % ( self.base_url, self.datatx_email, self.datatx_email, self.datatx_password, self.datatx_password ) + f = self.opener.open(url) + if f.read().find("ogged in as "+self.datatx_email) == -1: + raise "The "+self.datatx_email+" user could not login to Galaxy" + except: + print traceback.format_exc() + sys.exit(1) + + def add_to_library(self, server_dir, library_id, folder_id, dbkey=''): + ''' + This method adds the dataset file to the target data library & folder + by opening the corresponding url in Galaxy server running. + ''' + try: + params = urllib.urlencode(dict( cntrller='library_admin', + tool_id='upload1', + tool_state='None', + library_id=self.encode_id(library_id), + folder_id=self.encode_id(folder_id), + upload_option='upload_directory', + file_type='auto', + server_dir=os.path.basename(server_dir), + dbkey=dbkey, + show_dataset_id='True', + runtool_btn='Upload to library')) + #url = "http://localhost:8080/library_common/upload_library_dataset?cntrller=library_admin&tool_id=upload1&tool_state=None&library_id=adb5f5c93f827949&folder_id=adb5f5c93f827949&upload_option=upload_directory&file_type=auto&server_dir=003&dbkey=%3F&message=&runtool_btn=Upload+to+library" + #url = base_url+"/library_common/upload_library_dataset?library_id=adb5f5c93f827949&tool_id=upload1&file_type=auto&server_dir=datatx_22858&dbkey=%3F&upload_option=upload_directory&folder_id=529fd61ab1c6cc36&cntrller=library_admin&tool_state=None&runtool_btn=Upload+to+library" + url = self.base_url+"/library_common/upload_library_dataset" + #print url + #print params + f = self.opener.open(url, params) + if f.read().find("Data Library") == -1: + raise "Dataset could not be uploaded to the data library" + except: + print traceback.format_exc() + sys.exit(1) + + def import_to_history(self, ldda_id, library_id, folder_id): + try: + params = urllib.urlencode(dict( cntrller='library_admin', + show_deleted='False', + library_id=self.encode_id(library_id), + folder_id=self.encode_id(folder_id), + ldda_ids=self.encode_id(ldda_id), + do_action='import_to_history', + use_panels='False')) + #url = "http://lion.bx.psu.edu:8080/library_common/act_on_multiple_datasets?library_id=adb5f5c93f827949&show_deleted=False&ldda_ids=adb5f5c93f827949&cntrller=library_admin&do_action=import_to_history&use_panels=False" + #url = base_url+"/library_common/upload_library_dataset?library_id=adb5f5c93f827949&tool_id=upload1&file_type=auto&server_dir=datatx_22858&dbkey=%3F&upload_option=upload_directory&folder_id=529fd61ab1c6cc36&cntrller=library_admin&tool_state=None&runtool_btn=Upload+to+library" + url = self.base_url+"/library_common/act_on_multiple_datasets" + #print url + #print params + f = self.opener.open(url, params) + x = f.read() + if x.find("1 dataset(s) have been imported into your history.") == -1: + #print x + raise Exception("Dataset could not be imported into history") + except: + print traceback.format_exc() + sys.exit(1) + + + def run_workflow(self, workflow_id, hid, workflow_step): + input = str(workflow_step)+'|input' + try: + params = urllib.urlencode({'id':self.encode_id(workflow_id), + 'run_workflow': 'Run workflow', + input: hid}) + url = self.base_url+"/workflow/run" + #print url+'?'+params + f = self.opener.open(url, params) +# if f.read().find("1 dataset(s) have been imported into your history.") == -1: +# raise Exception("Error in running the workflow") + except: + print traceback.format_exc() + sys.exit(1) + + + def logout(self): + # finally logout + f = self.opener.open(self.base_url+'/user/logout') + + def encode_id(self, obj_id ): + id_secret = 'changethisinproductiontoo' + id_cipher = Blowfish.new( id_secret ) + # Convert to string + s = str( obj_id ) + # Pad to a multiple of 8 with leading "!" + s = ( "!" * ( 8 - len(s) % 8 ) ) + s + # Encrypt + return id_cipher.encrypt( s ).encode( 'hex' ) diff -r 207d0d70483b -r 076f572d7c9d templates/admin/requests/dataset.mako --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/templates/admin/requests/dataset.mako Wed Apr 21 10:41:30 2010 -0400 @@ -0,0 +1,71 @@ +<%inherit file="/base.mako"/> +<%namespace file="/message.mako" import="render_msg" /> + + +%if message: + ${render_msg( message, status )} +%endif + +<br/> +<br/> + +<ul class="manage-table-actions"> + <li> + <a class="action-button" href="${h.url_for( controller='requests_admin', action='show_datatx_page', sample_id=trans.security.encode_id(sample.id) )}"> + <span>Dataset transfer page</span></a> + </li> +</ul> + +<div class="toolForm"> + <div class="toolFormTitle">Dataset details</div> + <div class="toolFormBody"> + <form name="dataset_details" action="${h.url_for( controller='requests_admin', action='dataset_details', save_changes=True, sample_id=trans.security.encode_id(sample.id), dataset_index=dataset_index )}" method="post" > + <% + dataset = sample.dataset_files[dataset_index] + %> + <div class="form-row"> + <label>Name:</label> + <div style="float: left; width: 250px; margin-right: 10px;"> + %if dataset['status'] in [sample.transfer_status.IN_QUEUE, sample.transfer_status.NOT_STARTED]: + <input type="text" name="name" value="${dataset['name']}" size="60"/> + %else: + ${dataset['name']} + %endif + + </div> + <div style="clear: both"></div> + </div> + <div class="form-row"> + <label>File on the Sequencer:</label> + <div style="float: left; width: 250px; margin-right: 10px;"> + ${dataset['filepath']} + ##<input type="text" name="filepath" value="${dataset['filepath']}" size="100" readonly/> + </div> + <div style="clear: both"></div> + </div> + <div class="form-row"> + <label>Size:</label> + <div style="float: left; width: 250px; margin-right: 10px;"> + ${dataset.get('size', 'Unknown')} + </div> + <div style="clear: both"></div> + </div> + <div class="form-row"> + <label>Transfer status:</label> + <div style="float: left; width: 250px; margin-right: 10px;"> + ${dataset['status']} + <br/> + %if dataset['status'] == sample.transfer_status.ERROR: + ${dataset['error_msg']} + %endif + </div> + <div style="clear: both"></div> + </div> + %if dataset['status'] in [sample.transfer_status.IN_QUEUE, sample.transfer_status.NOT_STARTED]: + <div class="form-row"> + <input type="submit" name="save" value="Save"/> + </div> + %endif + </form> + </div> +</div> \ No newline at end of file diff -r 207d0d70483b -r 076f572d7c9d templates/admin/requests/get_data.mako --- a/templates/admin/requests/get_data.mako Tue Apr 20 15:36:03 2010 -0400 +++ b/templates/admin/requests/get_data.mako Wed Apr 21 10:41:30 2010 -0400 @@ -53,29 +53,44 @@ <div class="toolForm"> %if len(dataset_files): ## <form name="get_data" action="${h.url_for( controller='requests_admin', action='get_data', sample_id=sample.id)}" method="post" > + <div class="form-row"> + <h4>Sample Dataset(s)</h4> + %if sample.untransferred_dataset_files(): <div class="form-row"> - <h4>Sample Dataset(s)</h4> - <div class="form-row"> - <table class="grid"> - <thead> - <tr> - <th>Dataset File</th> - <th>Transfer Status</th> - <th></th> - </tr> - <thead> - <tbody> - %for dataset_index, dataset_file in enumerate(dataset_files): - ${sample_dataset_files( dataset_index, dataset_file[0], dataset_file[1] )} - %endfor - </tbody> - </table> - </div> - </div> + <ul class="manage-table-actions"> + <li> + <a class="action-button" href="${h.url_for( controller='requests_admin', action='get_data', start_transfer_button=True, sample_id=sample.id )}"> + <span>Start transfer</span></a> + </li> + </ul> + </div> + %endif + <div class="form-row"> + <table class="grid"> + <thead> + <tr> + <th>Dataset File</th> + <th>Transfer Status</th> + <th></th> + </tr> + <thead> + <tbody> + %for dataset_index, dataset_file in enumerate(dataset_files): + ${sample_dataset_files( dataset_index, dataset_file['name'], dataset_file['status'] )} + %endfor + </tbody> + </table> + </div> + </div> + ## </form> ##</div> + + +<br/> <br/> %endif + ##<div class="toolForm"> <form name="get_data" action="${h.url_for( controller='requests_admin', action='get_data', sample_id=sample.id)}" method="post" > <div class="form-row"> @@ -102,24 +117,24 @@ navigate away from this page. Once the transfer is complete the dataset(s) will show up on this page. </div> - <input type="submit" name="start_transfer_button" value="Transfer"/> + <input type="submit" name="select_files_button" value="Select"/> </div> </div> </div> </form> </div> -<%def name="sample_dataset_files( dataset_index, dataset_file, status )"> +<%def name="sample_dataset_files( dataset_index, dataset_name, status )"> <tr> <td> -## <label class="msg_head"><a href="${h.url_for( controller='requests_admin', action='show_dataset_file', sample_id=trans.security.encode_id(sample.id), dataset_index=dataset_index )}">${dataset_file.split('/')[-1]}</a></label> - <div class="msg_head"><u>${dataset_file.split('/')[-1]}</u></div> - <div class="msg_body"> - ${dataset_file} - </div> + <label class="msg_head"><a href="${h.url_for( controller='requests_admin', action='dataset_details', sample_id=trans.security.encode_id(sample.id), dataset_index=dataset_index )}">${dataset_name}</a></label> +## <div class="msg_head"><u>${dataset_file.split('/')[-1]}</u></div> +## <div class="msg_body"> +## ${dataset_file} +## </div> </td> <td> - %if status == sample.transfer_status.IN_PROGRESS: + %if status not in [sample.transfer_status.NOT_STARTED, sample.transfer_status.COMPLETE]: <i>${status}</i> %else: ${status} diff -r 207d0d70483b -r 076f572d7c9d universe_wsgi.ini.sample --- a/universe_wsgi.ini.sample Tue Apr 20 15:36:03 2010 -0400 +++ b/universe_wsgi.ini.sample Wed Apr 21 10:41:30 2010 -0400 @@ -287,7 +287,7 @@ # to be set up with a user account and other parameters listed below. The 'host' # and 'port' fields should point to where the RabbitMQ server is running. -#[galaxy:amqp] +[galaxy_amqp] #host = 127.0.0.1 #port = 5672 #userid = galaxy