details: http://www.bx.psu.edu/hg/galaxy/rev/3f3712d36034 changeset: 2720:3f3712d36034 user: rc date: Fri Sep 18 15:18:57 2009 -0400 description: Galaxy AMQP Listener, first pass Added a check for testing completed requests in Requests functional tests 7 file(s) affected in this change: lib/galaxy/web/controllers/requests_admin.py lib/galaxy/web/framework/helpers/grids.py run_galaxy_listener.sh scripts/galaxy_messaging/amqp_consumer.py scripts/galaxy_messaging/galaxydb_interface.py test/functional/test_forms_and_requests.py universe_wsgi.ini.sample diffs (300 lines): diff -r 7f18940a4821 -r 3f3712d36034 lib/galaxy/web/controllers/requests_admin.py --- a/lib/galaxy/web/controllers/requests_admin.py Fri Sep 18 14:21:59 2009 -0400 +++ b/lib/galaxy/web/controllers/requests_admin.py Fri Sep 18 15:18:57 2009 -0400 @@ -98,7 +98,6 @@ else: self.request_grid.default_filter = dict(state=kwargs['show_filter'], deleted=False) self.request_grid.show_filter = kwargs.get('show_filter', trans.app.model.Request.states.SUBMITTED) - self.__update_request_state(trans) # Render the list view return self.request_grid( trans, **kwargs ) @web.expose @@ -980,12 +979,6 @@ request_id=request.id, msg='Bar codes has been saved for this request', messagetype='done')) - - def __update_request_state(self, trans): - requests = trans.app.model.Request.query.filter_by(deleted=False, - state=trans.app.model.Request.states.SUBMITTED) - for request in requests: - self.__set_request_state(request) def __set_request_state(self, request): # check if all the samples of the current request are in the final state diff -r 7f18940a4821 -r 3f3712d36034 lib/galaxy/web/framework/helpers/grids.py --- a/lib/galaxy/web/framework/helpers/grids.py Fri Sep 18 14:21:59 2009 -0400 +++ b/lib/galaxy/web/framework/helpers/grids.py Fri Sep 18 15:18:57 2009 -0400 @@ -121,7 +121,7 @@ return None def build_initial_query( self, session ): return session.query( self.model_class ) - def apply_default_filter( self, trans, query ): + def apply_default_filter( self, trans, query, **kwargs): return query class GridColumn( object ): diff -r 7f18940a4821 -r 3f3712d36034 run_galaxy_listener.sh --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/run_galaxy_listener.sh Fri Sep 18 15:18:57 2009 -0400 @@ -0,0 +1,3 @@ +#!/bin/sh + +python scripts/galaxy_messaging/amqp_consumer.py \ No newline at end of file diff -r 7f18940a4821 -r 3f3712d36034 scripts/galaxy_messaging/amqp_consumer.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/scripts/galaxy_messaging/amqp_consumer.py Fri Sep 18 15:18:57 2009 -0400 @@ -0,0 +1,68 @@ +from amqplib import client_0_8 as amqp +import ConfigParser +import sys +import optparse +import xml.dom.minidom +from galaxydb_interface import GalaxyDbInterface + +galaxy_config_file = 'universe_wsgi.ini' +global dbconnstr + +def get_value(dom, tag_name): + ''' + This method extracts the tag value from the xml message + ''' + nodelist = dom.getElementsByTagName(tag_name)[0].childNodes + rc = "" + for node in nodelist: + if node.nodeType == node.TEXT_NODE: + rc = rc + node.data + return rc + +def recv_callback(msg): + #print 'Received: ' + msg.body + ' from channel #' + str(msg.channel.channel_id) + dom = xml.dom.minidom.parseString(msg.body) + barcode = get_value(dom, 'barcode') + state = get_value(dom, 'state') + print barcode, state + # update the galaxy db + galaxy = GalaxyDbInterface(dbconnstr) + sample_id = galaxy.get_sample_id(field_name='bar_code', value=barcode) + if sample_id == -1: + print 'Invalid barcode.' + return + galaxy.change_state(sample_id, state) + +def main(): + config = ConfigParser.ConfigParser() + config.read(galaxy_config_file) + global dbconnstr + dbconnstr = config.get("app:main", "database_connection") + amqp_config = {} + for option in config.options("galaxy:amqp"): + amqp_config[option] = config.get("galaxy:amqp", option) + print amqp_config + conn = amqp.Connection(host=amqp_config['host']+":"+amqp_config['port'], + userid=amqp_config['userid'], + password=amqp_config['password'], + virtual_host=amqp_config['virtual_host'], + insist=False) + chan = conn.channel() + chan.queue_declare(queue=amqp_config['queue'], durable=True, exclusive=True, auto_delete=False) + chan.exchange_declare(exchange=amqp_config['exchange'], type="direct", durable=True, auto_delete=False,) + chan.queue_bind(queue=amqp_config['queue'], + exchange=amqp_config['exchange'], + routing_key=amqp_config['routing_key']) + + chan.basic_consume(queue=amqp_config['queue'], + no_ack=True, + callback=recv_callback, + consumer_tag="testtag") + while True: + chan.wait() + chan.basic_cancel("testtag") + chan.close() + conn.close() + +if __name__ == '__main__': + main() \ No newline at end of file diff -r 7f18940a4821 -r 3f3712d36034 scripts/galaxy_messaging/galaxydb_interface.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/scripts/galaxy_messaging/galaxydb_interface.py Fri Sep 18 15:18:57 2009 -0400 @@ -0,0 +1,141 @@ +#/usr/bin/python + +from sqlalchemy import * +from sqlalchemy.orm import sessionmaker +from datetime import datetime, timedelta +import sys +import optparse +import os +import time +import logging + +logging.basicConfig(level=logging.DEBUG) +log = logging.getLogger( 'GalaxyDbInterface' ) + +class GalaxyDbInterface(object): + + def __init__(self, dbstr): + self.dbstr = dbstr + self.db_engine = create_engine(self.dbstr) +# self.db_engine.echo = True + self.metadata = MetaData(self.db_engine) + self.session = sessionmaker(bind=self.db_engine) + self.event_table = Table('sample_event', self.metadata, autoload=True ) + self.sample_table = Table('sample', self.metadata, autoload=True ) + self.request_table = Table('request', self.metadata, autoload=True ) + self.state_table = Table('sample_state', self.metadata, autoload=True ) + + def get_sample_id(self, field_name='bar_code', value=None): + if not value: + return -1 + sample_id = -1 + if field_name =='name': + stmt = select(columns=[self.sample_table.c.id], + whereclause=self.sample_table.c.name==value) + result = stmt.execute() + sample_id = result.fetchone()[0] + elif field_name == 'bar_code': + stmt = select(columns=[self.sample_table.c.id], + whereclause=self.sample_table.c.bar_code==value) + result = stmt.execute() + x = result.fetchone() + if x: + sample_id = x[0] + log.debug('Sample ID: %i' % sample_id) + return sample_id + log.warning('This sample %s %s does not belong to any sample in the database.' % (field_name, value)) + return -1 + + def current_state(self, sample_id): + ''' + This method returns the current state of the sample for the given sample_id + ''' + stmt = select(columns=[self.event_table.c.sample_state_id], + whereclause=self.event_table.c.sample_id==sample_id, + order_by=self.event_table.c.update_time.desc()) + result = stmt.execute() + all_states = result.fetchall() + current_state_id = all_states[0][0] + return current_state_id + + def all_possible_states(self, sample_id): + subsubquery = select(columns=[self.sample_table.c.request_id], + whereclause=self.sample_table.c.id==sample_id) + self.request_id = subsubquery.execute().fetchall()[0][0] + log.debug('REQUESTID: %i' % self.request_id) + subquery = select(columns=[self.request_table.c.request_type_id], + whereclause=self.request_table.c.id==self.request_id) + request_type_id = subquery.execute().fetchall()[0][0] + log.debug('REQUESTTYPEID: %i' % request_type_id) + query = select(columns=[self.state_table.c.id, self.state_table.c.name], + whereclause=self.state_table.c.request_type_id==request_type_id, + order_by=self.state_table.c.id.asc()) + states = query.execute().fetchall() + log.debug('POSSIBLESTATES: '+ str(states)) + return states + + def change_state(self, sample_id, new_state=None): + ''' + This method changes the state of the sample to the the 'new_state' + ''' + if not new_state: + return + new_state_id = -1 + # find the state_id for this new state in the list of possible states + possible_states = self.all_possible_states(sample_id) + for state_id, state_name in possible_states: + if new_state == state_name: + new_state_id = state_id + if new_state_id == -1: + return + log.debug('Updating sample_id %i state to %s' % (sample_id, new_state)) + d = timedelta(hours=4) + i = self.event_table.insert() + i.execute(update_time=datetime.now()+d, + create_time=datetime.now()+d, + sample_id=sample_id, + sample_state_id=int(new_state_id), + comment='bar code scanner') + # if all the samples for this request are in the final state + # then change the request state to 'Complete' + result = select(columns=[self.sample_table.c.id], + whereclause=self.sample_table.c.request_id==self.request_id).execute() + sample_id_list = result.fetchall() + request_complete = True + for sid in sample_id_list: + current_state_id = self.current_state(sid[0]) + if current_state_id != possible_states[-1][0]: + request_complete = False + break + if request_complete: + request_state = 'Complete' + else: + request_state = 'Submitted' + log.debug('Updating request_id %i state to "%s"' % (self.request_id, request_state)) + d = timedelta(hours=4) + i = self.request_table.update(whereclause=self.request_table.c.id==self.request_id, + values={self.request_table.c.state: request_state}) + i.execute() + + + +if __name__ == '__main__': + print '''This file should not be run directly. To start the Galaxy AMQP Listener: + %sh run_galaxy_listener.sh''' +# dbstr = 'postgres://postgres:postgres@localhost/galaxy_ft' +# +# parser = optparse.OptionParser() +# parser.add_option('-n', '--name', help='name of the sample field', dest='name', \ +# action='store', default='bar_code') +# parser.add_option('-v', '--value', help='value of the sample field', dest='value', \ +# action='store') +# parser.add_option('-s', '--state', help='new state of the sample', dest='state', \ +# action='store') +# (opts, args) = parser.parse_args() +# +# gs = GalaxyDbInterface(dbstr) +# sample_id = gs.get_sample_id(field_name=opts.name, value=opts.value) +# gs.change_state(sample_id, opts.state) + + + diff -r 7f18940a4821 -r 3f3712d36034 test/functional/test_forms_and_requests.py --- a/test/functional/test_forms_and_requests.py Fri Sep 18 14:21:59 2009 -0400 +++ b/test/functional/test_forms_and_requests.py Fri Sep 18 15:18:57 2009 -0400 @@ -247,6 +247,8 @@ self.home() request_one.refresh() # check if the request's state is now set to 'complete' + self.visit_url('%s/requests_admin/list?show_filter=Complete' % self.url) + self.check_page_for_string( request_one.name ) assert request_one.state is not request_one.states.COMPLETE, "The state of the request '%s' should be set to '%s'" % ( request_one.name, request_one.states.COMPLETE ) # def test_40_admin_create_request_on_behalf_of_regular_user( self ): # """Testing creating and submitting a request as an admin on behalf of a regular user""" diff -r 7f18940a4821 -r 3f3712d36034 universe_wsgi.ini.sample --- a/universe_wsgi.ini.sample Fri Sep 18 14:21:59 2009 -0400 +++ b/universe_wsgi.ini.sample Fri Sep 18 15:18:57 2009 -0400 @@ -240,3 +240,22 @@ ucsc_table_direct_archaea1 = local:/// ucsc_table_direct_test1 = local:/// upload1 = local:/// + + +# Galaxy Message Queue +# Galaxy uses AMQ protocol to receive messages from external sources like +# bar code scanners. Galaxy has been tested against RabbitMQ AMQP implementation. +# For Galaxy to receive messages from a message queue the RabbitMQ server has +# to be set up with a user account and other parameters listed below. The 'host' +# and 'port' fields should point to where the RabbitMQ server is running. + +#[galaxy:amqp] +#host = 127.0.0.1 +#port = 5672 +#userid = galaxy +#password = galaxy +#virtual_host = galaxy_messaging_engine +#queue = galaxy_queue +#exchange = galaxy_exchange +#routing_key = bar_code_scanner +