# HG changeset patch -- Bitbucket.org # Project galaxy-dist # URL http://bitbucket.org/galaxy/galaxy-dist/overview # User rc # Date 1289177219 18000 # Node ID 328b57b1e2e53cbc9ab208379cdd3dc0864f3cc4 # Parent 4b78bad81e777cf503ac66acb5dbb01da841400f sample tracking data transfer mechanism streamlined. - eliminated the transfer_datasets.ini config file - eliminated the data transfer user - now the admin user initiating the data transfer is used as the data transfer user. - the admin user is provided the add_library_item permission before data transfer if they dont have it - sample update & data transfer amqp messages now includes api_key
--- a/scripts/galaxy_messaging/client/galaxy_amq.ini.sample +++ b/scripts/galaxy_messaging/client/galaxy_amq.ini.sample @@ -14,6 +14,7 @@ #queue = galaxy_queue #exchange = galaxy_exchange #routing_key = bar_code_scanner +#api_key =
# The following section(s) 'scanner#' is for specifying the state of the # sample this scanner represents. This state name should be one of the
--- a/scripts/galaxy_messaging/server/amqp_consumer.py +++ b/scripts/galaxy_messaging/server/amqp_consumer.py @@ -15,6 +15,9 @@ import optparse import xml.dom.minidom import subprocess import urllib2 + +from xml_helper import get_value, get_value_index + from galaxydb_interface import GalaxyDbInterface
api_path = [ os.path.join( os.getcwd(), "scripts/api" ) ] @@ -44,77 +47,58 @@ log.addHandler(fh) # data transfer script data_transfer_script = os.path.join( os.getcwd(), "scripts/galaxy_messaging/server/data_transfer.py" ) +global config +global config_file_name +global http_server_section
-global dbconnstr -global webconfig -global config
-def get_value(dom, tag_name): - ''' - This method extracts the tag value from the xml message - ''' - nodelist = dom.getElementsByTagName(tag_name)[0].childNodes - rc = "" - for node in nodelist: - if node.nodeType == node.TEXT_NODE: - rc = rc + node.data - return rc - -def get_value_index(dom, tag_name, index): - ''' - This method extracts the tag value from the xml message - ''' - try: - nodelist = dom.getElementsByTagName(tag_name)[index].childNodes - except: - return None - rc = "" - for node in nodelist: - if node.nodeType == node.TEXT_NODE: - rc = rc + node.data - return rc - -def start_data_transfer(message): +def start_data_transfer( message ): # fork a new process to transfer datasets cmd = '%s "%s" "%s" "%s"' % ( "python", data_transfer_script, message.body, - sys.argv[1] ) # Galaxy config file name + config_file_name ) # Galaxy config file name pid = subprocess.Popen(cmd, shell=True).pid log.debug('Started process (%i): %s' % (pid, str(cmd)))
-def update_sample_state(message): +def update_sample_state( message ): dom = xml.dom.minidom.parseString(message.body) barcode = get_value(dom, 'barcode') state = get_value(dom, 'state') + api_key = get_value(dom, 'api_key') log.debug('Barcode: ' + barcode) log.debug('State: ' + state) + log.debug('api_key: ' + api_key) + # validate + if not barcode or not state or not api_key: + log.debug( 'Incomplete sample_state_update message received. Sample barcode, desired state and user API key is required.' ) + return # update the sample state in Galaxy db - galaxydb = GalaxyDbInterface(dbconnstr) - sample_id = galaxydb.get_sample_id(field_name='bar_code', value=barcode) + dbconnstr = config.get("app:main", "database_connection") + galaxydb = GalaxyDbInterface( dbconnstr ) + sample_id = galaxydb.get_sample_id( field_name='bar_code', value=barcode ) if sample_id == -1: - log.debug('Invalid barcode.') + log.debug( 'Invalid barcode.' ) return galaxydb.change_state(sample_id, state) # after updating the sample state, update request status request_id = galaxydb.get_request_id(sample_id) - update_request( request_id ) + update_request( api_key, request_id )
-def update_request( request_id ): - http_server_section = webconfig.get( "universe_wsgi_config", "http_server_section" ) +def update_request( api_key, request_id ): encoded_request_id = api.encode_id( config.get( "app:main", "id_secret" ), request_id ) - api_key = webconfig.get( "data_transfer_user_login_info", "api_key" ) data = dict( update_type=RequestsController.update_types.REQUEST ) url = "http://%s:%s/api/requests/%s" % ( config.get(http_server_section, "host"), config.get(http_server_section, "port"), encoded_request_id ) log.debug( 'Updating request %i' % request_id ) try: - api.update( api_key, url, data, return_formatted=False ) + retval = api.update( api_key, url, data, return_formatted=False ) + log.debug( str( retval ) ) except urllib2.URLError, e: log.debug( 'ERROR(update_request (%s)): %s' % ( str((self.api_key, url, data)), str(e) ) )
-def recv_callback(message): +def recv_callback( message ): # check the meesage type. msg_type = message.properties['application_headers'].get('msg_type') log.debug( 'MESSAGE RECVD: ' + str( msg_type ) ) @@ -126,22 +110,25 @@ def recv_callback(message): update_sample_state( message )
def main(): - if len(sys.argv) < 2: - print 'Usage: python amqp_consumer.py <Galaxy configuration file>' - return + parser = optparse.OptionParser() + parser.add_option('-c', '--config-file', help='Galaxy configuration file', + dest='config_file', action='store') + parser.add_option('-s', '--http-server-section', help='Name of the HTTP server section in the Galaxy configuration file', + dest='http_server_section', action='store') + (opts, args) = parser.parse_args() + log.debug( "GALAXY LISTENER PID: " + str(os.getpid()) + " - " + str( opts ) ) + # read the Galaxy config file + global config_file_name + config_file_name = opts.config_file global config config = ConfigParser.ConfigParser() - config.read( sys.argv[1] ) - global dbconnstr - dbconnstr = config.get("app:main", "database_connection") + config.read( opts.config_file ) + global http_server_section + http_server_section = opts.http_server_section amqp_config = {} for option in config.options("galaxy_amqp"): amqp_config[option] = config.get("galaxy_amqp", option) - log.debug("PID: " + str(os.getpid()) + ", " + str(amqp_config)) - # web server config - global webconfig - webconfig = ConfigParser.ConfigParser() - webconfig.read('transfer_datasets.ini') + log.debug( str( amqp_config ) ) # connect conn = amqp.Connection(host=amqp_config['host']+":"+amqp_config['port'], userid=amqp_config['userid'],
--- /dev/null +++ b/scripts/galaxy_messaging/server/xml_helper.py @@ -0,0 +1,28 @@ +#======= XML helper methods ==================================================== + +import xml.dom.minidom + +def get_value( dom, tag_name ): + ''' + This method extracts the tag value from the xml message + ''' + nodelist = dom.getElementsByTagName( tag_name )[ 0 ].childNodes + rc = "" + for node in nodelist: + if node.nodeType == node.TEXT_NODE: + rc = rc + node.data + return rc + +def get_value_index( dom, tag_name, dataset_id ): + ''' + This method extracts the tag value from the xml message + ''' + try: + nodelist = dom.getElementsByTagName( tag_name )[ dataset_id ].childNodes + except: + return None + rc = "" + for node in nodelist: + if node.nodeType == node.TEXT_NODE: + rc = rc + node.data + return rc
--- a/lib/galaxy/web/controllers/requests_common.py +++ b/lib/galaxy/web/controllers/requests_common.py @@ -655,8 +655,8 @@ class RequestsCommon( BaseController, Us event = trans.model.RequestEvent( request, request.states.SUBMITTED, message ) trans.sa_session.add( event ) trans.sa_session.flush() - if cntrller == 'api': - return 200, message + if cntrller == 'api': + return 200, message return trans.response.send_redirect( web.url_for( controller='requests_common', action='edit_samples', cntrller=cntrller,
--- a/scripts/galaxy_messaging/server/data_transfer.py +++ b/scripts/galaxy_messaging/server/data_transfer.py @@ -19,6 +19,8 @@ import urllib,urllib2, cookielib, shutil import logging, time, datetime import xml.dom.minidom
+from xml_helper import get_value, get_value_index + log = logging.getLogger( "datatx_" + str( os.getpid() ) ) log.setLevel( logging.DEBUG ) fh = logging.FileHandler( "data_transfer.log" ) @@ -36,35 +38,38 @@ new_path = [ os.path.join( os.getcwd(), new_path.extend( sys.path[1:] ) # remove scripts/ from the path sys.path = new_path
+from galaxy import eggs from galaxy.util.json import from_json_string, to_json_string from galaxy.model import SampleDataset from galaxy.web.api.requests import RequestsController -from galaxy import eggs import pkg_resources pkg_resources.require( "pexpect" ) import pexpect - pkg_resources.require( "simplejson" ) import simplejson
+log.debug(str(dir(api))) + class DataTransfer( object ):
def __init__( self, msg, config_file ): log.info( msg ) self.dom = xml.dom.minidom.parseString( msg ) - self.sequencer_host = self.get_value( self.dom, 'data_host' ) - self.sequencer_username = self.get_value( self.dom, 'data_user' ) - self.sequencer_password = self.get_value( self.dom, 'data_password' ) - self.request_id = self.get_value( self.dom, 'request_id' ) - self.sample_id = self.get_value( self.dom, 'sample_id' ) - self.library_id = self.get_value( self.dom, 'library_id' ) - self.folder_id = self.get_value( self.dom, 'folder_id' ) + self.galaxy_host = get_value( self.dom, 'galaxy_host' ) + self.api_key = get_value( self.dom, 'api_key' ) + self.sequencer_host = get_value( self.dom, 'data_host' ) + self.sequencer_username = get_value( self.dom, 'data_user' ) + self.sequencer_password = get_value( self.dom, 'data_password' ) + self.request_id = get_value( self.dom, 'request_id' ) + self.sample_id = get_value( self.dom, 'sample_id' ) + self.library_id = get_value( self.dom, 'library_id' ) + self.folder_id = get_value( self.dom, 'folder_id' ) self.dataset_files = [] count=0 while True: - dataset_id = self.get_value_index( self.dom, 'dataset_id', count ) - file = self.get_value_index( self.dom, 'file', count ) - name = self.get_value_index( self.dom, 'name', count ) + dataset_id = get_value_index( self.dom, 'dataset_id', count ) + file = get_value_index( self.dom, 'file', count ) + name = get_value_index( self.dom, 'name', count ) if file: self.dataset_files.append( dict( name=name, dataset_id=int( dataset_id ), @@ -72,18 +77,6 @@ class DataTransfer( object ): else: break count=count+1 - try: - # Retrieve the upload user login information from the config file - transfer_datasets_config = ConfigParser.ConfigParser( ) - transfer_datasets_config.read( 'transfer_datasets.ini' ) - self.data_transfer_user_email = transfer_datasets_config.get( "data_transfer_user_login_info", "email" ) - self.data_transfer_user_password = transfer_datasets_config.get( "data_transfer_user_login_info", "password" ) - self.api_key = transfer_datasets_config.get( "data_transfer_user_login_info", "api_key" ) - self.http_server_section = transfer_datasets_config.get( "universe_wsgi_config", "http_server_section" ) - except: - log.error( traceback.format_exc() ) - log.error( 'ERROR reading config values from transfer_datasets.ini.' ) - sys.exit(1) # read config variables config = ConfigParser.ConfigParser() retval = config.read( config_file ) @@ -92,14 +85,6 @@ class DataTransfer( object ): log.error( error_msg ) sys.exit(1) try: - self.server_host = config.get( self.http_server_section, "host" ) - except ConfigParser.NoOptionError,e: - self.server_host = '127.0.0.1' - try: - self.server_port = config.get( self.http_server_section, "port" ) - except ConfigParser.NoOptionError,e: - self.server_port = '8080' - try: self.config_id_secret = config.get( "app:main", "id_secret" ) except ConfigParser.NoOptionError,e: self.config_id_secret = "USING THE DEFAULT IS NOT SECURE!" @@ -198,9 +183,8 @@ class DataTransfer( object ): data[ 'dbkey' ] = '' data[ 'upload_option' ] = 'upload_directory' data[ 'create_type' ] = 'file' - url = "http://%s:%s/api/libraries/%s/contents" % ( self.server_host, - self.server_port, - api.encode_id( self.config_id_secret, self.library_id ) ) + url = "http://%s/api/libraries/%s/contents" % ( self.galaxy_host, + api.encode_id( self.config_id_secret, self.library_id ) ) log.debug( str( ( self.api_key, url, data ) ) ) retval = api.submit( self.api_key, url, data, return_formatted=False ) log.debug( str( retval ) ) @@ -225,9 +209,8 @@ class DataTransfer( object ): data[ 'sample_dataset_ids' ] = sample_dataset_ids data[ 'new_status' ] = status data[ 'error_msg' ] = msg - url = "http://%s:%s/api/requests/%s" % ( self.server_host, - self.server_port, - api.encode_id( self.config_id_secret, self.request_id ) ) + url = "http://%s/api/requests/%s" % ( self.galaxy_host, + api.encode_id( self.config_id_secret, self.request_id ) ) log.debug( str( ( self.api_key, url, data))) retval = api.update( self.api_key, url, data, return_formatted=False ) log.debug( str( retval ) ) @@ -239,31 +222,6 @@ class DataTransfer( object ): log.error( 'FATAL ERROR' ) sys.exit( 1 )
- def get_value( self, dom, tag_name ): - ''' - This method extracts the tag value from the xml message - ''' - nodelist = dom.getElementsByTagName( tag_name )[ 0 ].childNodes - rc = "" - for node in nodelist: - if node.nodeType == node.TEXT_NODE: - rc = rc + node.data - return rc - - def get_value_index( self, dom, tag_name, dataset_id ): - ''' - This method extracts the tag value from the xml message - ''' - try: - nodelist = dom.getElementsByTagName( tag_name )[ dataset_id ].childNodes - except: - return None - rc = "" - for node in nodelist: - if node.nodeType == node.TEXT_NODE: - rc = rc + node.data - return rc - if __name__ == '__main__': log.info( 'STARTING %i %s' % ( os.getpid(), str( sys.argv ) ) ) #
--- a/lib/galaxy/web/controllers/requests_admin.py +++ b/lib/galaxy/web/controllers/requests_admin.py @@ -109,7 +109,7 @@ class DataTransferGrid( grids.Grid ): default_sort_key = "-create_time" num_rows_per_page = 50 preserve_state = True - use_paging = True + use_paging = False columns = [ NameColumn( "Name", link=( lambda item: dict( operation="view", id=item.id ) ), @@ -299,7 +299,7 @@ class RequestsAdmin( BaseController, Use sample=selected_sample_datasets[0].sample, id_list=id_list ) elif operation == "transfer": - self.__start_datatx( trans, selected_sample_datasets[0].sample, selected_sample_datasets ) + self.initiate_data_transfer( trans, selected_sample_datasets[0].sample, selected_sample_datasets ) # Render the grid view sample_id = params.get( 'sample_id', None ) try: @@ -538,68 +538,36 @@ class RequestsAdmin( BaseController, Use return sample.request.name + '_' + sample.name + '_' + name if opt == options.EXPERIMENT_NAME: return sample.request.name + '_' + name - def __setup_datatx_user( self, trans, sample ): + def __check_library_add_permission( self, trans, target_library, target_folder ): """ - Sets up the datatx user: - - Checks if the user exists, if not creates them. - - Checks if the user had ADD_LIBRARY permission on the target library - and the target folder, if not sets up the permissions. + Checks if the current admin user had ADD_LIBRARY permission on the target library + and the target folder, if not provide the permissions. """ - # Retrieve the upload user login information from the config file - config = ConfigParser.ConfigParser() - ok = True - try: - config.read( 'transfer_datasets.ini' ) - except Exception, e: - message = "Error attempting to read config file named 'transfer_datasets.ini'. Make sure this file is correct." - ok = False - try: - email = config.get( "data_transfer_user_login_info", "email" ) - password = config.get( "data_transfer_user_login_info", "password" ) - except Exception, e: - message = "The 'data_transfer_user_login_info' section is missing from the 'transfer_datasets.ini'. Make sure this file is correct." - ok = False - if not ok: - status = 'error' - return trans.response.send_redirect( web.url_for( controller='requests_admin', - action='manage_datasets', - sample_id=trans.security.encode_id( sample.id ), - status=status, - message=message ) ) - # check if the user already exists - datatx_user = trans.sa_session.query( trans.model.User ) \ - .filter( trans.model.User.table.c.email==email ) \ - .first() - if not datatx_user: - # if not create the user - datatx_user = trans.model.User( email=email, password=passsword ) - if trans.app.config.use_remote_user: - datatx_user.external = True - trans.sa_session.add( datatx_user ) - trans.sa_session.flush() - trans.app.security_agent.create_private_user_role( datatx_user ) - trans.app.security_agent.user_set_default_permissions( datatx_user, history=False, dataset=False ) - datatx_user_roles = datatx_user.all_roles() - datatx_user_private_role = trans.app.security_agent.get_private_user_role( datatx_user ) + current_user_roles = trans.user.all_roles() + current_user_private_role = trans.app.security_agent.get_private_user_role( trans.user ) # Make sure this user has LIBRARY_ADD permissions on the target library and folder. # If not, give them permission. - if not trans.app.security_agent.can_add_library_item( datatx_user_roles, sample.library ): + if not trans.app.security_agent.can_add_library_item( current_user_roles, target_library ): lp = trans.model.LibraryPermissions( trans.app.security_agent.permitted_actions.LIBRARY_ADD.action, - sample.library, - datatx_user_private_role ) + target_library, + current_user_private_role ) trans.sa_session.add( lp ) - if not trans.app.security_agent.can_add_library_item( datatx_user_roles, sample.folder ): + if not trans.app.security_agent.can_add_library_item( current_user_roles, target_folder ): lfp = trans.model.LibraryFolderPermissions( trans.app.security_agent.permitted_actions.LIBRARY_ADD.action, - sample.folder, - datatx_user_private_role ) + target_folder, + current_user_private_role ) trans.sa_session.add( lfp ) trans.sa_session.flush() - return datatx_user - def __send_message( self, trans, datatx_info, sample, selected_sample_datasets ): - """Ceates an xml message and sends it to the rabbitmq server""" + def __create_data_transfer_message( self, trans, sample, selected_sample_datasets ): + """ + Creates an xml message to send to the rabbitmq server + """ + datatx_info = sample.request.type.datatx_info # Create the xml message based on the following template xml = \ ''' <data_transfer> + <galaxy_host>%(GALAXY_HOST)s</galaxy_host> + <api_key>%(API_KEY)s</api_key><data_host>%(DATA_HOST)s</data_host><data_user>%(DATA_USER)s</data_user><data_password>%(DATA_PASSWORD)s</data_password> @@ -624,50 +592,77 @@ class RequestsAdmin( BaseController, Use sample_dataset.status = trans.app.model.SampleDataset.transfer_status.IN_QUEUE trans.sa_session.add( sample_dataset ) trans.sa_session.flush() - data = xml % dict( DATA_HOST=datatx_info['host'], - DATA_USER=datatx_info['username'], - DATA_PASSWORD=datatx_info['password'], - REQUEST_ID=str(sample.request.id), - SAMPLE_ID=str(sample.id), - LIBRARY_ID=str(sample.library.id), - FOLDER_ID=str(sample.folder.id), - DATASETS=datasets ) - # Send the message - try: - conn = amqp.Connection( host=trans.app.config.amqp['host'] + ":" + trans.app.config.amqp['port'], - userid=trans.app.config.amqp['userid'], - password=trans.app.config.amqp['password'], - virtual_host=trans.app.config.amqp['virtual_host'], - insist=False ) - chan = conn.channel() - msg = amqp.Message( data.replace( '\n', '' ).replace( '\r', '' ), - content_type='text/plain', - application_headers={'msg_type': 'data_transfer'} ) - msg.properties["delivery_mode"] = 2 - chan.basic_publish( msg, - exchange=trans.app.config.amqp['exchange'], - routing_key=trans.app.config.amqp['routing_key'] ) - chan.close() - conn.close() - except Exception, e: - message = "Error in sending the data transfer message to the Galaxy AMQP message queue:<br/>%s" % str(e) - status = "error" - return trans.response.send_redirect( web.url_for( controller='requests_admin', - action='manage_datasets', - sample_id=trans.security.encode_id( sample.id ), - status=status, - message=message) ) - - def __start_datatx( self, trans, sample, selected_sample_datasets ): - datatx_user = self.__setup_datatx_user( trans, sample ) - # Validate sequencer information + message = xml % dict( GALAXY_HOST=trans.request.host, + API_KEY=trans.user.api_keys[0].key, + DATA_HOST=datatx_info[ 'host' ], + DATA_USER=datatx_info[ 'username' ], + DATA_PASSWORD=datatx_info[ 'password' ], + REQUEST_ID=str( sample.request.id ), + SAMPLE_ID=str( sample.id ), + LIBRARY_ID=str( sample.library.id ), + FOLDER_ID=str( sample.folder.id ), + DATASETS=datasets ) + return message + def __validate_data_transfer_settings( self, trans, sample ): + err_msg = '' + # check the sequencer login info datatx_info = sample.request.type.datatx_info - if not datatx_info['host'] or not datatx_info['username'] or not datatx_info['password']: - message = "Error in sequencer login information." - status = "error" - else: - self.__send_message( trans, datatx_info, sample, selected_sample_datasets ) - message = "%i datasets have been queued for transfer from the sequencer. Click the Refresh button above to see the latest transfer status." % len( selected_sample_datasets ) + if not datatx_info[ 'host' ] \ + or not datatx_info[ 'username' ] \ + or not datatx_info[ 'password' ]: + err_msg = "Error in sequencer login information." + # check if web API is enabled and API key exists + if not trans.user.api_keys or not trans.app.config.enable_api: + err_msg = "Could not start data transfer as Galaxy Web API is not enabled. Enable Galaxy Web API in the Galaxy config file and create an API key." + # check if library_import_dir is set + if not trans.app.config.library_import_dir: + err_msg = "'library_import_dir' config variable is not set in the Galaxy config file." + # check the RabbitMQ server settings in the config file + for k, v in trans.app.config.amqp.items(): + if not v: + err_msg = 'Set RabbitMQ server settings in the "galaxy_amqp" section of the Galaxy config file. %s is not set.' % k + break + return err_msg + def initiate_data_transfer( self, trans, sample, selected_sample_datasets ): + ''' + This method initiates the transfer of the datasets from the sequencer. It + happens in the following steps: + - The current admin user needs to have ADD_LIBRARY_ITEM permission for the + target library and folder + - Create an XML message encapsulating all the data transfer info and send it + to the message queue (RabbitMQ broker) + ''' + # check data transfer settings + err_msg = self.__validate_data_transfer_settings( trans, sample ) + if not err_msg: + # check if the current user has add_library_item permission to the sample + # target library & folder + self.__check_library_add_permission( trans, sample.library, sample.folder ) + # create the message + message = self.__create_data_transfer_message( trans, + sample, + selected_sample_datasets ) + # Send the message + try: + conn = amqp.Connection( host=trans.app.config.amqp[ 'host' ] + ":" + trans.app.config.amqp[ 'port' ], + userid=trans.app.config.amqp[ 'userid' ], + password=trans.app.config.amqp[ 'password' ], + virtual_host=trans.app.config.amqp[ 'virtual_host' ], + insist=False ) + chan = conn.channel() + msg = amqp.Message( message.replace( '\n', '' ).replace( '\r', '' ), + content_type='text/plain', + application_headers={ 'msg_type': 'data_transfer' } ) + msg.properties[ "delivery_mode" ] = 2 + chan.basic_publish( msg, + exchange=trans.app.config.amqp[ 'exchange' ], + routing_key=trans.app.config.amqp[ 'routing_key' ] ) + chan.close() + conn.close() + except Exception, e: + err_msg = "Error in sending the data transfer message to the Galaxy AMQP message queue:<br/>%s" % str(e) + if not err_msg: + message = "%i datasets have been queued for transfer from the sequencer. Click the Refresh button above to monitor the transfer status." % len( selected_sample_datasets ) status = "done" return trans.response.send_redirect( web.url_for( controller='requests_admin', action='manage_datasets',
--- a/transfer_datasets.ini.sample +++ /dev/null @@ -1,10 +0,0 @@ -# Galaxy LIMS Transfer Datasets Configuration File - -[data_transfer_user_login_info] -#email = datatx@bx.psu.edu -#password = datatx -#api_key = Generate it from the User menu in Galaxy - -[universe_wsgi_config] -# The http server section name in the Galaxy config file (universe_wsgi.ini) -#http_server_section = server:main
--- a/lib/galaxy/web/api/requests.py +++ b/lib/galaxy/web/api/requests.py @@ -88,7 +88,7 @@ class RequestsController( BaseController request_id = trans.security.decode_id( id ) except TypeError: trans.response.status = 400 - return "Malformed %s id ( %s ) specified, unable to decode." % ( update_type, str( id ) ) + return "Malformed request id ( %s ) specified, unable to decode." % str( id ) try: request = trans.sa_session.query( trans.app.model.Request ).get( request_id ) except:
--- a/run_galaxy_listener.sh +++ b/run_galaxy_listener.sh @@ -1,4 +1,4 @@ #!/bin/sh
cd `dirname $0` -python scripts/galaxy_messaging/server/amqp_consumer.py universe_wsgi.ini 2>&1 +python scripts/galaxy_messaging/server/amqp_consumer.py --config-file=universe_wsgi.ini --http-server-section=server:main 2>&1
--- a/scripts/galaxy_messaging/client/amqp_publisher.py +++ b/scripts/galaxy_messaging/client/amqp_publisher.py @@ -19,13 +19,15 @@ xml = \ ''' <sample><barcode>%(BARCODE)s</barcode><state>%(STATE)s</state> + <api_key>%(API_KEY)s</api_key></sample>'''
def handle_scan(states, amqp_config, barcode): if states.get(barcode[:2], None): values = dict( BARCODE=barcode[2:], - STATE=states.get(barcode[:2]) ) + STATE=states.get(barcode[:2]), + API_KEY=amqp_config['api_key'] ) print values data = xml % values print data @@ -68,6 +70,10 @@ def main(): states = {} for option in config.options("galaxy:amqp"): amqp_config[option] = config.get("galaxy:amqp", option) + # abort if api_key is not set in the config file + if not amqp_config['api_key']: + print 'Error: Set the api_key config variable in the config file before starting the amqp_publisher script.' + sys.exit( 1 ) count = 1 while True: section = 'scanner%i' % count
galaxy-commits@lists.galaxyproject.org