1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/890e70b0fd27/ Changeset: 890e70b0fd27 User: dannon Date: 2014-02-19 20:27:10 Summary: Swap amqplib for amqp. This was mostly a drop-in replacement, since it's a currently-maintained fork of amqplib, with slight updates to work with amqp 0-9-1. Affected #: 6 files diff -r dcca6c291a6972ecd9024781667eb670de191c40 -r 890e70b0fd27d78d5982930f0176c26c4bebbfc0 eggs.ini --- a/eggs.ini +++ b/eggs.ini @@ -33,7 +33,6 @@ [eggs:noplatform] amqp = 1.4.3 -amqplib = 0.6.1 anyjson = 0.3.3 Beaker = 1.4 boto = 2.5.2 diff -r dcca6c291a6972ecd9024781667eb670de191c40 -r 890e70b0fd27d78d5982930f0176c26c4bebbfc0 lib/galaxy/web/framework/__init__.py --- a/lib/galaxy/web/framework/__init__.py +++ b/lib/galaxy/web/framework/__init__.py @@ -45,7 +45,7 @@ from sqlalchemy.orm.exc import NoResultFound eggs.require( "pexpect" ) -eggs.require( "amqplib" ) +eggs.require( "amqp" ) import logging log = logging.getLogger( __name__ ) diff -r dcca6c291a6972ecd9024781667eb670de191c40 -r 890e70b0fd27d78d5982930f0176c26c4bebbfc0 lib/galaxy/webapps/demo_sequencer/framework/__init__.py --- a/lib/galaxy/webapps/demo_sequencer/framework/__init__.py +++ b/lib/galaxy/webapps/demo_sequencer/framework/__init__.py @@ -31,7 +31,7 @@ import mako.runtime pkg_resources.require( "pexpect" ) -pkg_resources.require( "amqplib" ) +pkg_resources.require( "amqp" ) import logging log = logging.getLogger( __name__ ) diff -r dcca6c291a6972ecd9024781667eb670de191c40 -r 890e70b0fd27d78d5982930f0176c26c4bebbfc0 lib/galaxy/webapps/galaxy/controllers/requests_admin.py --- a/lib/galaxy/webapps/galaxy/controllers/requests_admin.py +++ b/lib/galaxy/webapps/galaxy/controllers/requests_admin.py @@ -6,7 +6,9 @@ from galaxy import model, util from galaxy.web.form_builder import * from .requests_common import RequestsGrid, invalid_id_redirect -from amqplib import client_0_8 as amqp +from galaxy import eggs +eggs.require("amqp") +import amqp import logging, os, pexpect, ConfigParser log = logging.getLogger( __name__ ) @@ -696,8 +698,7 @@ conn = amqp.Connection( host=trans.app.config.amqp[ 'host' ] + ":" + trans.app.config.amqp[ 'port' ], userid=trans.app.config.amqp[ 'userid' ], password=trans.app.config.amqp[ 'password' ], - virtual_host=trans.app.config.amqp[ 'virtual_host' ], - insist=False ) + virtual_host=trans.app.config.amqp[ 'virtual_host' ]) chan = conn.channel() msg = amqp.Message( rmq_msg, content_type='text/plain', diff -r dcca6c291a6972ecd9024781667eb670de191c40 -r 890e70b0fd27d78d5982930f0176c26c4bebbfc0 scripts/galaxy_messaging/client/amqp_publisher.py --- a/scripts/galaxy_messaging/client/amqp_publisher.py +++ b/scripts/galaxy_messaging/client/amqp_publisher.py @@ -1,18 +1,23 @@ ''' -This script gets barcode data from a barcode scanner using serial communication +This script gets barcode data from a barcode scanner using serial communication and sends the state representated by the barcode scanner & the barcode string -to the Galaxy LIMS RabbitMQ server. The message is sent in XML which has 2 tags, +to the Galaxy LIMS RabbitMQ server. The message is sent in XML which has 2 tags, barcode & state. The state of the scanner should be set in the galaxy_amq.ini -file as a configuration variable. +file as a configuration variable. ''' -from amqplib import client_0_8 as amqp +import array import ConfigParser -import sys, os +import optparse +import os import serial -import array +import sys import time -import optparse + +from galaxy import eggs +eggs.require("amqp") + +import amqp xml = \ @@ -31,14 +36,14 @@ print values data = xml % values print data - conn = amqp.Connection(host=amqp_config['host']+":"+amqp_config['port'], - userid=amqp_config['userid'], - password=amqp_config['password'], - virtual_host=amqp_config['virtual_host'], - insist=False) + conn = amqp.Connection(host=amqp_config['host']+":"+amqp_config['port'], + userid=amqp_config['userid'], + password=amqp_config['password'], + virtual_host=amqp_config['virtual_host'],) + chan = conn.channel() - msg = amqp.Message(data, - content_type='text/plain', + msg = amqp.Message(data, + content_type='text/plain', application_headers={'msg_type': 'sample_state_update'}) msg.properties["delivery_mode"] = 2 chan.basic_publish(msg, @@ -55,13 +60,13 @@ msg = s.read(bytes) print msg handle_scan(states, amqp_config, msg.strip()) - - + + def main(): parser = optparse.OptionParser() - parser.add_option('-c', '--config-file', help='config file with all the AMQP config parameters', + parser.add_option('-c', '--config-file', help='config file with all the AMQP config parameters', dest='config_file', action='store') - parser.add_option('-p', '--port', help='Name of the port where the scanner is connected', + parser.add_option('-p', '--port', help='Name of the port where the scanner is connected', dest='port', action='store') (opts, args) = parser.parse_args() config = ConfigParser.ConfigParser() @@ -81,7 +86,7 @@ states[config.get(section, 'prefix')] = config.get(section, 'state') count = count + 1 else: - break + break print amqp_config print states s = serial.Serial(int(opts.port)) @@ -90,6 +95,6 @@ s.close() print 'Port %s is open: %s' %( opts.port, s.isOpen()) - + if __name__ == '__main__': main() diff -r dcca6c291a6972ecd9024781667eb670de191c40 -r 890e70b0fd27d78d5982930f0176c26c4bebbfc0 scripts/galaxy_messaging/server/amqp_consumer.py --- a/scripts/galaxy_messaging/server/amqp_consumer.py +++ b/scripts/galaxy_messaging/server/amqp_consumer.py @@ -1,8 +1,8 @@ ''' Galaxy Messaging with AMQP (RabbitMQ) -Galaxy uses AMQ protocol to receive messages from external sources like +Galaxy uses AMQ protocol to receive messages from external sources like bar code scanners. Galaxy has been tested against RabbitMQ AMQP implementation. -For Galaxy to receive messages from a message queue the RabbitMQ server has +For Galaxy to receive messages from a message queue the RabbitMQ server has to be set up with a user account and other parameters listed in the [galaxy_amqp] section in the universe_wsgi.ini config file Once the RabbitMQ server has been setup and started with the given parameters, @@ -32,8 +32,8 @@ from galaxy import eggs from galaxy.web.api.requests import RequestsAPIController import pkg_resources -pkg_resources.require( "amqplib" ) -from amqplib import client_0_8 as amqp +pkg_resources.require( "amqp" ) +import amqp import logging log = logging.getLogger("GalaxyAMQP") @@ -45,7 +45,7 @@ log.addHandler(fh) # data transfer script -data_transfer_script = os.path.join( os.getcwd(), +data_transfer_script = os.path.join( os.getcwd(), "scripts/galaxy_messaging/server/data_transfer.py" ) global config global config_file_name @@ -54,13 +54,13 @@ def start_data_transfer( message ): # fork a new process to transfer datasets - cmd = '%s "%s" "%s" "%s"' % ( "python", - data_transfer_script, + cmd = '%s "%s" "%s" "%s"' % ( "python", + data_transfer_script, message.body, config_file_name ) # Galaxy config file name pid = subprocess.Popen(cmd, shell=True).pid log.debug('Started process (%i): %s' % (pid, str(cmd))) - + def update_sample_state( message ): dom = xml.dom.minidom.parseString(message.body) barcode = get_value(dom, 'barcode') @@ -69,7 +69,7 @@ log.debug('Barcode: ' + barcode) log.debug('State: ' + state) log.debug('API Key: ' + api_key) - # validate + # validate if not barcode or not state or not api_key: log.debug( 'Incomplete sample_state_update message received. Sample barcode, desired state and user API key is required.' ) return @@ -78,13 +78,13 @@ galaxydb = GalaxyDbInterface( dbconnstr ) sample_id = galaxydb.get_sample_id( field_name='bar_code', value=barcode ) if sample_id == -1: - log.debug( 'Invalid barcode.' ) + log.debug( 'Invalid barcode.' ) return galaxydb.change_state(sample_id, state) # after updating the sample state, update request status request_id = galaxydb.get_request_id(sample_id) update_request( api_key, request_id ) - + def update_request( api_key, request_id ): encoded_request_id = api.encode_id( config.get( "app:main", "id_secret" ), request_id ) data = dict( update_type=RequestsAPIController.update_types.REQUEST ) @@ -111,9 +111,9 @@ def main(): parser = optparse.OptionParser() - parser.add_option('-c', '--config-file', help='Galaxy configuration file', + parser.add_option('-c', '--config-file', help='Galaxy configuration file', dest='config_file', action='store') - parser.add_option('-s', '--http-server-section', help='Name of the HTTP server section in the Galaxy configuration file', + parser.add_option('-s', '--http-server-section', help='Name of the HTTP server section in the Galaxy configuration file', dest='http_server_section', action='store') (opts, args) = parser.parse_args() log.debug( "GALAXY LISTENER PID: " + str(os.getpid()) + " - " + str( opts ) ) @@ -129,33 +129,32 @@ for option in config.options("galaxy_amqp"): amqp_config[option] = config.get("galaxy_amqp", option) log.debug( str( amqp_config ) ) - # connect - conn = amqp.Connection(host=amqp_config['host']+":"+amqp_config['port'], - userid=amqp_config['userid'], - password=amqp_config['password'], - virtual_host=amqp_config['virtual_host'], - insist=False) + # connect + conn = amqp.Connection(host=amqp_config['host']+":"+amqp_config['port'], + userid=amqp_config['userid'], + password=amqp_config['password'], + virtual_host=amqp_config['virtual_host']) chan = conn.channel() - chan.queue_declare( queue=amqp_config['queue'], - durable=True, - exclusive=True, + chan.queue_declare( queue=amqp_config['queue'], + durable=True, + exclusive=True, auto_delete=False) - chan.exchange_declare( exchange=amqp_config['exchange'], - type="direct", - durable=True, - auto_delete=False,) - chan.queue_bind( queue=amqp_config['queue'], - exchange=amqp_config['exchange'], + chan.exchange_declare( exchange=amqp_config['exchange'], + type="direct", + durable=True, + auto_delete=False,) + chan.queue_bind( queue=amqp_config['queue'], + exchange=amqp_config['exchange'], routing_key=amqp_config['routing_key']) - chan.basic_consume( queue=amqp_config['queue'], - no_ack=True, - callback=recv_callback, + chan.basic_consume( queue=amqp_config['queue'], + no_ack=True, + callback=recv_callback, consumer_tag="testtag") log.debug('Connected to rabbitmq server - '+amqp_config['host']+":"+amqp_config['port']) while True: chan.wait() - chan.basic_cancel("testtag") + chan.basic_cancel("testtag") chan.close() conn.close() Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.