details: http://www.bx.psu.edu/hg/galaxy/rev/5b2d593d9aed changeset: 2957:5b2d593d9aed user: rc date: Wed Nov 04 10:04:51 2009 -0500 description: AMQP messaging server and client files. diffstat: scripts/galaxy_messaging/amqp_consumer.py | 94 ------------- scripts/galaxy_messaging/client/amqp_publisher.py | 87 ++++++++++++ scripts/galaxy_messaging/client/galaxy_amq.ini.sample | 32 ++++ scripts/galaxy_messaging/client/report.bat.sample | 1 + scripts/galaxy_messaging/client/scan.bat.sample | 1 + scripts/galaxy_messaging/client/scan.sh.sample | 1 + scripts/galaxy_messaging/client/scanner.py | 92 +++++++++++++ scripts/galaxy_messaging/client/scanner_interface.py | 76 ++++++++++ scripts/galaxy_messaging/galaxydb_interface.py | 151 --------------------- scripts/galaxy_messaging/server/amqp_consumer.py | 94 +++++++++++++ scripts/galaxy_messaging/server/galaxydb_interface.py | 149 +++++++++++++++++++++ 11 files changed, 533 insertions(+), 245 deletions(-) diffs (829 lines): diff -r 984b1eb6c428 -r 5b2d593d9aed scripts/galaxy_messaging/amqp_consumer.py --- a/scripts/galaxy_messaging/amqp_consumer.py Wed Nov 04 09:32:09 2009 -0500 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,94 +0,0 @@ -''' -Galaxy Messaging with AMQP (RabbitMQ) -Galaxy uses AMQ protocol to receive messages from external sources like -bar code scanners. Galaxy has been tested against RabbitMQ AMQP implementation. -For Galaxy to receive messages from a message queue the RabbitMQ server has -to be set up with a user account and other parameters listed in the [galaxy:amq] -section in the universe_wsgi.ini config file -Once the RabbitMQ server has been setup and started with the given parameters, -this script can be run to receive messages and update the Galaxy database accordingly -''' - -import ConfigParser -import sys, os -import optparse -import xml.dom.minidom -from galaxydb_interface import GalaxyDbInterface - -assert sys.version_info[:2] >= ( 2, 4 ) -new_path = [ os.path.join( os.getcwd(), "lib" ) ] -new_path.extend( sys.path[1:] ) # remove scripts/ from the path -sys.path = new_path - -from galaxy import eggs -import pkg_resources -pkg_resources.require( "amqplib" ) - -from amqplib import client_0_8 as amqp - -import logging -logging.basicConfig(level=logging.DEBUG) -log = logging.getLogger( 'GalaxyAMQP' ) - - -galaxy_config_file = 'universe_wsgi.ini' -global dbconnstr - -def get_value(dom, tag_name): - ''' - This method extracts the tag value from the xml message - ''' - nodelist = dom.getElementsByTagName(tag_name)[0].childNodes - rc = "" - for node in nodelist: - if node.nodeType == node.TEXT_NODE: - rc = rc + node.data - return rc - -def recv_callback(msg): - dom = xml.dom.minidom.parseString(msg.body) - barcode = get_value(dom, 'barcode') - state = get_value(dom, 'state') - log.debug('Barcode: '+barcode) - log.debug('State: '+state) - # update the galaxy db - galaxy = GalaxyDbInterface(dbconnstr) - sample_id = galaxy.get_sample_id(field_name='bar_code', value=barcode) - if sample_id == -1: - log.debug('Invalid barcode.') - return - galaxy.change_state(sample_id, state) - -def main(): - config = ConfigParser.ConfigParser() - config.read(galaxy_config_file) - global dbconnstr - dbconnstr = config.get("app:main", "database_connection") - amqp_config = {} - for option in config.options("galaxy:amqp"): - amqp_config[option] = config.get("galaxy:amqp", option) - log.debug(str(amqp_config)) - conn = amqp.Connection(host=amqp_config['host']+":"+amqp_config['port'], - userid=amqp_config['userid'], - password=amqp_config['password'], - virtual_host=amqp_config['virtual_host'], - insist=False) - chan = conn.channel() - chan.queue_declare(queue=amqp_config['queue'], durable=True, exclusive=True, auto_delete=False) - chan.exchange_declare(exchange=amqp_config['exchange'], type="direct", durable=True, auto_delete=False,) - chan.queue_bind(queue=amqp_config['queue'], - exchange=amqp_config['exchange'], - routing_key=amqp_config['routing_key']) - - chan.basic_consume(queue=amqp_config['queue'], - no_ack=True, - callback=recv_callback, - consumer_tag="testtag") - while True: - chan.wait() - chan.basic_cancel("testtag") - chan.close() - conn.close() - -if __name__ == '__main__': - main() \ No newline at end of file diff -r 984b1eb6c428 -r 5b2d593d9aed scripts/galaxy_messaging/client/amqp_publisher.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/scripts/galaxy_messaging/client/amqp_publisher.py Wed Nov 04 10:04:51 2009 -0500 @@ -0,0 +1,87 @@ +''' +This script gets barcode data from a barcode scanner using serial communication +and sends the state representated by the barcode scanner & the barcode string +to the Galaxy LIMS RabbitMQ server. The message is sent in XML which has 2 tags, +barcode & state. The state of the scanner should be set in the galaxy_amq.ini +file as a configuration variable. +''' + +from amqplib import client_0_8 as amqp +import ConfigParser +import sys, os +import serial +import array +import time +import optparse + + +xml = \ +''' <sample> + <barcode>%(BARCODE)s</barcode> + <state>%(STATE)s</state> + </sample>''' + + +def handle_scan(states, amqp_config, barcode): + if states.get(barcode[:2], None): + values = dict( BARCODE=barcode[2:], + STATE=states.get(barcode[:2]) ) + print values + data = xml % values + print data + conn = amqp.Connection(host=amqp_config['host']+":"+amqp_config['port'], + userid=amqp_config['userid'], + password=amqp_config['password'], + virtual_host=amqp_config['virtual_host'], + insist=False) + chan = conn.channel() + msg = amqp.Message(data) + msg.properties["delivery_mode"] = 2 + chan.basic_publish(msg, + exchange=amqp_config['exchange'], + routing_key=amqp_config['routing_key']) + chan.close() + conn.close() + +def recv_data(states, amqp_config, s): + while True: + bytes = s.inWaiting() + if bytes: + print '%i bytes recvd' % bytes + msg = s.read(bytes) + print msg + handle_scan(states, amqp_config, msg.strip()) + + +def main(): + parser = optparse.OptionParser() + parser.add_option('-c', '--config-file', help='config file with all the AMQP config parameters', + dest='config_file', action='store') + parser.add_option('-p', '--port', help='Name of the port where the scanner is connected', + dest='port', action='store') + (opts, args) = parser.parse_args() + config = ConfigParser.ConfigParser() + config.read(opts.config_file) + amqp_config = {} + states = {} + for option in config.options("galaxy:amqp"): + amqp_config[option] = config.get("galaxy:amqp", option) + count = 1 + while True: + section = 'scanner%i' % count + if config.has_section(section): + states[config.get(section, 'prefix')] = config.get(section, 'state') + count = count + 1 + else: + break + print amqp_config + print states + s = serial.Serial(int(opts.port)) + print 'Port %s is open: %s' %( opts.port, s.isOpen()) + recv_data(states, amqp_config, s) + s.close() + print 'Port %s is open: %s' %( opts.port, s.isOpen()) + + +if __name__ == '__main__': + main() diff -r 984b1eb6c428 -r 5b2d593d9aed scripts/galaxy_messaging/client/galaxy_amq.ini.sample --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/scripts/galaxy_messaging/client/galaxy_amq.ini.sample Wed Nov 04 10:04:51 2009 -0500 @@ -0,0 +1,32 @@ +# Galaxy Message Queue +# Galaxy uses AMQ protocol to receive messages from external sources like +# bar code scanners. Galaxy has been tested against RabbitMQ AMQP implementation. +# For Galaxy to receive messages from a message queue the RabbitMQ server has +# to be set up with a user account and other parameters listed below. The 'host' +# and 'port' fields should point to where the RabbitMQ server is running. + +#[galaxy:amqp] +#host = 127.0.0.1 +#port = 5672 +#userid = galaxy +#password = galaxy +#virtual_host = galaxy_messaging_engine +#queue = galaxy_queue +#exchange = galaxy_exchange +#routing_key = bar_code_scanner + +# The following section(s) 'scanner#' is for specifying the state of the +# sample this scanner represents. This state name should be one of the +# possible states created for this request type in Galaxy +# If there multiple scanners attached to this host the add as many "scanner#" +# sections below each with the name & prefix of the bar code scanner and +# the state it represents +#[scanner1] +#name = +#state = +#prefix = + +#[scanner2] +#name = +#state = +#prefix = \ No newline at end of file diff -r 984b1eb6c428 -r 5b2d593d9aed scripts/galaxy_messaging/client/report.bat.sample --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/scripts/galaxy_messaging/client/report.bat.sample Wed Nov 04 10:04:51 2009 -0500 @@ -0,0 +1,1 @@ +python scanner.py -p 2 -c galaxy_amq.ini -r \ No newline at end of file diff -r 984b1eb6c428 -r 5b2d593d9aed scripts/galaxy_messaging/client/scan.bat.sample --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/scripts/galaxy_messaging/client/scan.bat.sample Wed Nov 04 10:04:51 2009 -0500 @@ -0,0 +1,1 @@ +python amqp_publisher.py -p 2 -c galaxy_amq.ini \ No newline at end of file diff -r 984b1eb6c428 -r 5b2d593d9aed scripts/galaxy_messaging/client/scan.sh.sample --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/scripts/galaxy_messaging/client/scan.sh.sample Wed Nov 04 10:04:51 2009 -0500 @@ -0,0 +1,1 @@ +python amqp_publisher.py -p 3 -c galaxy_amq.ini \ No newline at end of file diff -r 984b1eb6c428 -r 5b2d593d9aed scripts/galaxy_messaging/client/scanner.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/scripts/galaxy_messaging/client/scanner.py Wed Nov 04 10:04:51 2009 -0500 @@ -0,0 +1,92 @@ +import sys, os +import serial +import array +import time +import optparse +import ConfigParser, logging +from scanner_interface import ScannerInterface + +logging.basicConfig(level=logging.DEBUG) +log = logging.getLogger( 'Scanner' ) + +# command prefix: SYN M CR +cmd = [22, 77, 13] +response = { 6: 'ACK', 5: 'ENQ', 21: 'NAK' } +image_scanner_report = 'RPTSCN.' +get_prefix1 = 'PREBK2?.' +get_prefix2 = ':4820:PREBK2?.' +set_prefix = 'PREBK2995859.' +clear_prefix = 'PRECA2.' + +def get_prefix_cmd(name): + return ':' + name + ':' + 'PREBK2?.' + +def set_prefix_cmd(name, prefix): + prefix_str = '' + for c in prefix: + prefix_str = prefix_str + hex(ord(c))[2:] + return ':' + name + ':' + 'PREBK299' + prefix_str + '!' + +def read_config_file(config_file): + config = ConfigParser.ConfigParser() + config.read(config_file) + count = 1 + scanners_list = [] + while True: + section = 'scanner%i' % count + if config.has_section(section): + scanner = dict(name=config.get(section, 'name'), + prefix=config.get(section, 'prefix'), + state=config.get(section, 'state')) + scanners_list.append(scanner) + count = count + 1 + else: + return scanners_list + +def main(): + usage = "python %s -p PORT -c CONFIG_FILE [ OPTION ]" % sys.argv[0] + parser = optparse.OptionParser(usage=usage) + parser.add_option('-p', '--port', help='Name of the port where the scanner is connected', + dest='port', action='store') + parser.add_option('-c', '--config-file', help='config file with all the AMQP config parameters', + dest='config_file', action='store') + parser.add_option('-r', '--report', help='scanner report', + dest='report', action='store_true', default=False) + parser.add_option('-i', '--install', help='install the scanners', + dest='install', action='store_true', default=False) + (opts, args) = parser.parse_args() + # validate + if not opts.port: + parser.print_help() + sys.exit(0) + if ( opts.report or opts.install ) and not opts.config_file: + parser.print_help() + sys.exit(0) + + # create the scanner interface + si = ScannerInterface(opts.port) + if opts.install: + scanners_list = read_config_file(opts.config_file) + for scanner in scanners_list: + msg = set_prefix_cmd(scanner['name'], scanner['prefix']) + si.send(msg) + response = si.recv() + if not response: + log.error("Scanner %s could not be installed." % scanner['name']) + elif opts.report: + si.send(image_scanner_report) + rep = si.recv() + log.info(rep) + scanners_list = read_config_file(opts.config_file) + for scanner in scanners_list: + msg = get_prefix_cmd(scanner['name']) + si.send(msg) + response = si.recv() + if response: + log.info('PREFIX for scanner %s: %s' % (scanner['name'], chr(int(response[8:12][:2], 16))+chr(int(response[8:12][2:], 16)) )) + si.close() + + + +if __name__ == "__main__": + main() diff -r 984b1eb6c428 -r 5b2d593d9aed scripts/galaxy_messaging/client/scanner_interface.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/scripts/galaxy_messaging/client/scanner_interface.py Wed Nov 04 10:04:51 2009 -0500 @@ -0,0 +1,76 @@ +import sys, os +import serial +import array +import time +import optparse +import ConfigParser +import logging + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger( 'ScannerInterface' ) + +class ScannerInterface( object ): + cmdprefix = [22, 77, 13] + response = { 6: 'ACK', 5: 'ENQ', 21: 'NAK' } + + def __init__( self, port ): + if os.name in ['posix', 'mac']: + self.port = port + elif os.name == 'nt': + self.port = int(port) + if self.port: + self.open() + + def open(self): + try: + self.serial_conn = serial.Serial(self.port) + except serial.SerialException: + log.exception('Unable to open port: %s' % str(self.port)) + sys.exit(1) + log.debug('Port %s is open: %s' %( str(self.port), self.serial_conn.isOpen() ) ) + + def is_open(self): + return self.serial_conn.isOpen() + + def close(self): + self.serial_conn.close() + log.debug('Port %s is open: %s' %( str(self.port), self.serial_conn.isOpen() ) ) + + def send(self, msg): + message = self.cmdprefix + map(ord, msg) + byte_array = array.array('B', message) + log.debug('Sending message to %s: %s' % ( str(self.port), message) ) + bytes = self.serial_conn.write( byte_array.tostring() ) + log.debug('%i bytes out of %i bytes sent to the scanner' % ( bytes, len(message) ) ) + + def recv(self): + time.sleep(1) + self.serial_conn.flush() + nbytes = self.serial_conn.inWaiting() + log.debug('%i bytes received' % nbytes) + if nbytes: + msg = self.serial_conn.read(nbytes) + byte_array = map(ord, msg) + log.debug('Message received [%s]: %s' % (self.response.get(byte_array[len(byte_array)-2], byte_array[len(byte_array)-2]), + msg)) + return msg + else: + log.error('Error!') + return None + + def setup_recv(self, callback): + self.recv_callback = callback + + def wait(self): + nbytes = self.serial_conn.inWaiting() + if nbytes: + msg = self.serial_conn.read(nbytes) + byte_array = map(ord, msg) + log.debug('Message received [%s]: %s' % (self.response.get(byte_array[len(byte_array)-2], byte_array[len(byte_array)-2], + msg))) + if self.recv_callback: + self.recv_callback(msg) + return + + + \ No newline at end of file diff -r 984b1eb6c428 -r 5b2d593d9aed scripts/galaxy_messaging/galaxydb_interface.py --- a/scripts/galaxy_messaging/galaxydb_interface.py Wed Nov 04 09:32:09 2009 -0500 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,151 +0,0 @@ -#/usr/bin/python - -from datetime import datetime, timedelta -import sys -import optparse -import os -import time -import logging - -assert sys.version_info[:2] >= ( 2, 4 ) -new_path = [ os.path.join( os.getcwd(), "lib" ) ] -new_path.extend( sys.path[1:] ) # remove scripts/ from the path -sys.path = new_path -from galaxy import eggs -import pkg_resources -pkg_resources.require( "psycopg2" ) -import psycopg2 -pkg_resources.require( "SQLAlchemy >= 0.4" ) -from sqlalchemy import * -from sqlalchemy.orm import sessionmaker - -logging.basicConfig(level=logging.DEBUG) -log = logging.getLogger( 'GalaxyDbInterface' ) - -class GalaxyDbInterface(object): - - def __init__(self, dbstr): - self.dbstr = dbstr - self.db_engine = create_engine(self.dbstr) -# self.db_engine.echo = True - self.metadata = MetaData(self.db_engine) - self.session = sessionmaker(bind=self.db_engine) - self.event_table = Table('sample_event', self.metadata, autoload=True ) - self.sample_table = Table('sample', self.metadata, autoload=True ) - self.request_table = Table('request', self.metadata, autoload=True ) - self.state_table = Table('sample_state', self.metadata, autoload=True ) - - def get_sample_id(self, field_name='bar_code', value=None): - if not value: - return -1 - sample_id = -1 - if field_name =='name': - stmt = select(columns=[self.sample_table.c.id], - whereclause=self.sample_table.c.name==value) - result = stmt.execute() - sample_id = result.fetchone()[0] - elif field_name == 'bar_code': - stmt = select(columns=[self.sample_table.c.id], - whereclause=self.sample_table.c.bar_code==value) - result = stmt.execute() - x = result.fetchone() - if x: - sample_id = x[0] - log.debug('Sample ID: %i' % sample_id) - return sample_id - log.warning('This sample %s %s does not belong to any sample in the database.' % (field_name, value)) - return -1 - - def current_state(self, sample_id): - ''' - This method returns the current state of the sample for the given sample_id - ''' - stmt = select(columns=[self.event_table.c.sample_state_id], - whereclause=self.event_table.c.sample_id==sample_id, - order_by=self.event_table.c.update_time.desc()) - result = stmt.execute() - all_states = result.fetchall() - current_state_id = all_states[0][0] - return current_state_id - - def all_possible_states(self, sample_id): - subsubquery = select(columns=[self.sample_table.c.request_id], - whereclause=self.sample_table.c.id==sample_id) - self.request_id = subsubquery.execute().fetchall()[0][0] - log.debug('REQUESTID: %i' % self.request_id) - subquery = select(columns=[self.request_table.c.request_type_id], - whereclause=self.request_table.c.id==self.request_id) - request_type_id = subquery.execute().fetchall()[0][0] - log.debug('REQUESTTYPEID: %i' % request_type_id) - query = select(columns=[self.state_table.c.id, self.state_table.c.name], - whereclause=self.state_table.c.request_type_id==request_type_id, - order_by=self.state_table.c.id.asc()) - states = query.execute().fetchall() - log.debug('POSSIBLESTATES: '+ str(states)) - return states - - def change_state(self, sample_id, new_state=None): - ''' - This method changes the state of the sample to the the 'new_state' - ''' - if not new_state: - return - new_state_id = -1 - # find the state_id for this new state in the list of possible states - possible_states = self.all_possible_states(sample_id) - for state_id, state_name in possible_states: - if new_state == state_name: - new_state_id = state_id - if new_state_id == -1: - return - log.debug('Updating sample_id %i state to %s' % (sample_id, new_state)) - d = timedelta(hours=4) - i = self.event_table.insert() - i.execute(update_time=datetime.now()+d, - create_time=datetime.now()+d, - sample_id=sample_id, - sample_state_id=int(new_state_id), - comment='bar code scanner') - # if all the samples for this request are in the final state - # then change the request state to 'Complete' - result = select(columns=[self.sample_table.c.id], - whereclause=self.sample_table.c.request_id==self.request_id).execute() - sample_id_list = result.fetchall() - request_complete = True - for sid in sample_id_list: - current_state_id = self.current_state(sid[0]) - if current_state_id != possible_states[-1][0]: - request_complete = False - break - if request_complete: - request_state = 'Complete' - else: - request_state = 'Submitted' - log.debug('Updating request_id %i state to "%s"' % (self.request_id, request_state)) - d = timedelta(hours=4) - i = self.request_table.update(whereclause=self.request_table.c.id==self.request_id, - values={self.request_table.c.state: request_state}) - i.execute() - - - -if __name__ == '__main__': - print '''This file should not be run directly. To start the Galaxy AMQP Listener: - %sh run_galaxy_listener.sh''' -# dbstr = 'postgres://postgres:postgres@localhost/galaxy_ft' -# -# parser = optparse.OptionParser() -# parser.add_option('-n', '--name', help='name of the sample field', dest='name', \ -# action='store', default='bar_code') -# parser.add_option('-v', '--value', help='value of the sample field', dest='value', \ -# action='store') -# parser.add_option('-s', '--state', help='new state of the sample', dest='state', \ -# action='store') -# (opts, args) = parser.parse_args() -# -# gs = GalaxyDbInterface(dbstr) -# sample_id = gs.get_sample_id(field_name=opts.name, value=opts.value) -# gs.change_state(sample_id, opts.state) - - - diff -r 984b1eb6c428 -r 5b2d593d9aed scripts/galaxy_messaging/server/amqp_consumer.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/scripts/galaxy_messaging/server/amqp_consumer.py Wed Nov 04 10:04:51 2009 -0500 @@ -0,0 +1,94 @@ +''' +Galaxy Messaging with AMQP (RabbitMQ) +Galaxy uses AMQ protocol to receive messages from external sources like +bar code scanners. Galaxy has been tested against RabbitMQ AMQP implementation. +For Galaxy to receive messages from a message queue the RabbitMQ server has +to be set up with a user account and other parameters listed in the [galaxy:amq] +section in the universe_wsgi.ini config file +Once the RabbitMQ server has been setup and started with the given parameters, +this script can be run to receive messages and update the Galaxy database accordingly +''' + +import ConfigParser +import sys, os +import optparse +import xml.dom.minidom +from galaxydb_interface import GalaxyDbInterface + +assert sys.version_info[:2] >= ( 2, 4 ) +new_path = [ os.path.join( os.getcwd(), "lib" ) ] +new_path.extend( sys.path[1:] ) # remove scripts/ from the path +sys.path = new_path + +from galaxy import eggs +import pkg_resources +pkg_resources.require( "amqplib" ) + +from amqplib import client_0_8 as amqp + +import logging +logging.basicConfig(level=logging.DEBUG) +log = logging.getLogger( 'GalaxyAMQP' ) + + +galaxy_config_file = 'universe_wsgi.ini' +global dbconnstr + +def get_value(dom, tag_name): + ''' + This method extracts the tag value from the xml message + ''' + nodelist = dom.getElementsByTagName(tag_name)[0].childNodes + rc = "" + for node in nodelist: + if node.nodeType == node.TEXT_NODE: + rc = rc + node.data + return rc + +def recv_callback(msg): + dom = xml.dom.minidom.parseString(msg.body) + barcode = get_value(dom, 'barcode') + state = get_value(dom, 'state') + log.debug('Barcode: '+barcode) + log.debug('State: '+state) + # update the galaxy db + galaxy = GalaxyDbInterface(dbconnstr) + sample_id = galaxy.get_sample_id(field_name='bar_code', value=barcode) + if sample_id == -1: + log.debug('Invalid barcode.') + return + galaxy.change_state(sample_id, state) + +def main(): + config = ConfigParser.ConfigParser() + config.read(galaxy_config_file) + global dbconnstr + dbconnstr = config.get("app:main", "database_connection") + amqp_config = {} + for option in config.options("galaxy:amqp"): + amqp_config[option] = config.get("galaxy:amqp", option) + log.debug(str(amqp_config)) + conn = amqp.Connection(host=amqp_config['host']+":"+amqp_config['port'], + userid=amqp_config['userid'], + password=amqp_config['password'], + virtual_host=amqp_config['virtual_host'], + insist=False) + chan = conn.channel() + chan.queue_declare(queue=amqp_config['queue'], durable=True, exclusive=True, auto_delete=False) + chan.exchange_declare(exchange=amqp_config['exchange'], type="direct", durable=True, auto_delete=False,) + chan.queue_bind(queue=amqp_config['queue'], + exchange=amqp_config['exchange'], + routing_key=amqp_config['routing_key']) + + chan.basic_consume(queue=amqp_config['queue'], + no_ack=True, + callback=recv_callback, + consumer_tag="testtag") + while True: + chan.wait() + chan.basic_cancel("testtag") + chan.close() + conn.close() + +if __name__ == '__main__': + main() \ No newline at end of file diff -r 984b1eb6c428 -r 5b2d593d9aed scripts/galaxy_messaging/server/galaxydb_interface.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/scripts/galaxy_messaging/server/galaxydb_interface.py Wed Nov 04 10:04:51 2009 -0500 @@ -0,0 +1,149 @@ +#/usr/bin/python + +from datetime import datetime +import sys +import optparse +import os +import time +import logging + +assert sys.version_info[:2] >= ( 2, 4 ) +new_path = [ os.path.join( os.getcwd(), "lib" ) ] +new_path.extend( sys.path[1:] ) # remove scripts/ from the path +sys.path = new_path +from galaxy import eggs +import pkg_resources +pkg_resources.require( "psycopg2" ) +import psycopg2 +pkg_resources.require( "SQLAlchemy >= 0.4" ) +from sqlalchemy import * +from sqlalchemy.orm import sessionmaker + +logging.basicConfig(level=logging.DEBUG) +log = logging.getLogger( 'GalaxyDbInterface' ) + +class GalaxyDbInterface(object): + + def __init__(self, dbstr): + self.dbstr = dbstr + self.db_engine = create_engine(self.dbstr) +# self.db_engine.echo = True + self.metadata = MetaData(self.db_engine) + self.session = sessionmaker(bind=self.db_engine) + self.event_table = Table('sample_event', self.metadata, autoload=True ) + self.sample_table = Table('sample', self.metadata, autoload=True ) + self.request_table = Table('request', self.metadata, autoload=True ) + self.state_table = Table('sample_state', self.metadata, autoload=True ) + + def get_sample_id(self, field_name='bar_code', value=None): + if not value: + return -1 + sample_id = -1 + if field_name =='name': + stmt = select(columns=[self.sample_table.c.id], + whereclause=self.sample_table.c.name==value) + result = stmt.execute() + sample_id = result.fetchone()[0] + elif field_name == 'bar_code': + stmt = select(columns=[self.sample_table.c.id], + whereclause=self.sample_table.c.bar_code==value) + result = stmt.execute() + x = result.fetchone() + if x: + sample_id = x[0] + log.debug('Sample ID: %i' % sample_id) + return sample_id + log.warning('This sample %s %s does not belong to any sample in the database.' % (field_name, value)) + return -1 + + def current_state(self, sample_id): + ''' + This method returns the current state of the sample for the given sample_id + ''' + stmt = select(columns=[self.event_table.c.sample_state_id], + whereclause=self.event_table.c.sample_id==sample_id, + order_by=self.event_table.c.update_time.desc()) + result = stmt.execute() + all_states = result.fetchall() + current_state_id = all_states[0][0] + return current_state_id + + def all_possible_states(self, sample_id): + subsubquery = select(columns=[self.sample_table.c.request_id], + whereclause=self.sample_table.c.id==sample_id) + self.request_id = subsubquery.execute().fetchall()[0][0] + log.debug('REQUESTID: %i' % self.request_id) + subquery = select(columns=[self.request_table.c.request_type_id], + whereclause=self.request_table.c.id==self.request_id) + request_type_id = subquery.execute().fetchall()[0][0] + log.debug('REQUESTTYPEID: %i' % request_type_id) + query = select(columns=[self.state_table.c.id, self.state_table.c.name], + whereclause=self.state_table.c.request_type_id==request_type_id, + order_by=self.state_table.c.id.asc()) + states = query.execute().fetchall() + log.debug('POSSIBLESTATES: '+ str(states)) + return states + + def change_state(self, sample_id, new_state=None): + ''' + This method changes the state of the sample to the the 'new_state' + ''' + if not new_state: + return + new_state_id = -1 + # find the state_id for this new state in the list of possible states + possible_states = self.all_possible_states(sample_id) + for state_id, state_name in possible_states: + if new_state == state_name: + new_state_id = state_id + if new_state_id == -1: + return + log.debug('Updating sample_id %i state to %s' % (sample_id, new_state)) + i = self.event_table.insert() + i.execute(update_time=datetime.utcnow(), + create_time=datetime.utcnow(), + sample_id=sample_id, + sample_state_id=int(new_state_id), + comment='bar code scanner') + # if all the samples for this request are in the final state + # then change the request state to 'Complete' + result = select(columns=[self.sample_table.c.id], + whereclause=self.sample_table.c.request_id==self.request_id).execute() + sample_id_list = result.fetchall() + request_complete = True + for sid in sample_id_list: + current_state_id = self.current_state(sid[0]) + if current_state_id != possible_states[-1][0]: + request_complete = False + break + if request_complete: + request_state = 'Complete' + else: + request_state = 'Submitted' + log.debug('Updating request_id %i state to "%s"' % (self.request_id, request_state)) + i = self.request_table.update(whereclause=self.request_table.c.id==self.request_id, + values={self.request_table.c.state: request_state}) + i.execute() + + + +if __name__ == '__main__': + print '''This file should not be run directly. To start the Galaxy AMQP Listener: + %sh run_galaxy_listener.sh''' + dbstr = 'postgres://postgres:postgres@localhost/galaxy_uft' + + parser = optparse.OptionParser() + parser.add_option('-n', '--name', help='name of the sample field', dest='name', \ + action='store', default='bar_code') + parser.add_option('-v', '--value', help='value of the sample field', dest='value', \ + action='store') + parser.add_option('-s', '--state', help='new state of the sample', dest='state', \ + action='store') + (opts, args) = parser.parse_args() + + gs = GalaxyDbInterface(dbstr) + sample_id = gs.get_sample_id(field_name=opts.name, value=opts.value) + gs.change_state(sample_id, opts.state) + + +