1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/9b8d3dae6742/ Changeset: 9b8d3dae6742 User: natefoo Date: 2014-09-15 21:53:39 Summary: Remove old messaging code. Affected #: 14 files diff -r b4924e433ae2df18771a7e43c23287f4b9bc23dd -r 9b8d3dae67427121793b2390baca699f2887d600 run_galaxy_listener.sh --- a/run_galaxy_listener.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/sh - -cd `dirname $0` -python scripts/galaxy_messaging/server/amqp_consumer.py --config-file=universe_wsgi.ini --http-server-section=server:main 2>&1 \ No newline at end of file diff -r b4924e433ae2df18771a7e43c23287f4b9bc23dd -r 9b8d3dae67427121793b2390baca699f2887d600 scripts/galaxy_messaging/client/amqp_publisher.py --- a/scripts/galaxy_messaging/client/amqp_publisher.py +++ /dev/null @@ -1,100 +0,0 @@ -''' -This script gets barcode data from a barcode scanner using serial communication -and sends the state representated by the barcode scanner & the barcode string -to the Galaxy LIMS RabbitMQ server. The message is sent in XML which has 2 tags, -barcode & state. The state of the scanner should be set in the galaxy_amq.ini -file as a configuration variable. -''' - -import array -import ConfigParser -import optparse -import os -import serial -import sys -import time - -from galaxy import eggs -eggs.require("amqp") - -import amqp - - -xml = \ -''' <sample> - <barcode>%(BARCODE)s</barcode> - <state>%(STATE)s</state> - <api_key>%(API_KEY)s</api_key> - </sample>''' - - -def handle_scan(states, amqp_config, barcode): - if states.get(barcode[:2], None): - values = dict( BARCODE=barcode[2:], - STATE=states.get(barcode[:2]), - API_KEY=amqp_config['api_key'] ) - print values - data = xml % values - print data - conn = amqp.Connection(host=amqp_config['host']+":"+amqp_config['port'], - userid=amqp_config['userid'], - password=amqp_config['password'], - virtual_host=amqp_config['virtual_host'],) - - chan = conn.channel() - msg = amqp.Message(data, - content_type='text/plain', - application_headers={'msg_type': 'sample_state_update'}) - msg.properties["delivery_mode"] = 2 - chan.basic_publish(msg, - exchange=amqp_config['exchange'], - routing_key=amqp_config['routing_key']) - chan.close() - conn.close() - -def recv_data(states, amqp_config, s): - while True: - bytes = s.inWaiting() - if bytes: - print '%i bytes recvd' % bytes - msg = s.read(bytes) - print msg - handle_scan(states, amqp_config, msg.strip()) - - -def main(): - parser = optparse.OptionParser() - parser.add_option('-c', '--config-file', help='config file with all the AMQP config parameters', - dest='config_file', action='store') - parser.add_option('-p', '--port', help='Name of the port where the scanner is connected', - dest='port', action='store') - (opts, args) = parser.parse_args() - config = ConfigParser.ConfigParser() - config.read(opts.config_file) - amqp_config = {} - states = {} - for option in config.options("galaxy:amqp"): - amqp_config[option] = config.get("galaxy:amqp", option) - # abort if api_key is not set in the config file - if not amqp_config['api_key']: - print 'Error: Set the api_key config variable in the config file before starting the amqp_publisher script.' - sys.exit( 1 ) - count = 1 - while True: - section = 'scanner%i' % count - if config.has_section(section): - states[config.get(section, 'prefix')] = config.get(section, 'state') - count = count + 1 - else: - break - print amqp_config - print states - s = serial.Serial(int(opts.port)) - print 'Port %s is open: %s' %( opts.port, s.isOpen()) - recv_data(states, amqp_config, s) - s.close() - print 'Port %s is open: %s' %( opts.port, s.isOpen()) - - -if __name__ == '__main__': - main() diff -r b4924e433ae2df18771a7e43c23287f4b9bc23dd -r 9b8d3dae67427121793b2390baca699f2887d600 scripts/galaxy_messaging/client/galaxy_amq.ini.sample --- a/scripts/galaxy_messaging/client/galaxy_amq.ini.sample +++ /dev/null @@ -1,33 +0,0 @@ -# Galaxy Message Queue -# Galaxy uses AMQ protocol to receive messages from external sources like -# bar code scanners. Galaxy has been tested against RabbitMQ AMQP implementation. -# For Galaxy to receive messages from a message queue the RabbitMQ server has -# to be set up with a user account and other parameters listed below. The 'host' -# and 'port' fields should point to where the RabbitMQ server is running. - -#[galaxy:amqp] -#host = 127.0.0.1 -#port = 5672 -#userid = galaxy -#password = galaxy -#virtual_host = galaxy_messaging_engine -#queue = galaxy_queue -#exchange = galaxy_exchange -#routing_key = bar_code_scanner -#api_key = - -# The following section(s) 'scanner#' is for specifying the state of the -# sample this scanner represents. This state name should be one of the -# possible states created for this request type in Galaxy -# If there multiple scanners attached to this host the add as many "scanner#" -# sections below each with the name & prefix of the bar code scanner and -# the state it represents -#[scanner1] -#name = -#state = -#prefix = - -#[scanner2] -#name = -#state = -#prefix = \ No newline at end of file diff -r b4924e433ae2df18771a7e43c23287f4b9bc23dd -r 9b8d3dae67427121793b2390baca699f2887d600 scripts/galaxy_messaging/client/report.bat.sample --- a/scripts/galaxy_messaging/client/report.bat.sample +++ /dev/null @@ -1,1 +0,0 @@ -python scanner.py -p 2 -c galaxy_amq.ini -r \ No newline at end of file diff -r b4924e433ae2df18771a7e43c23287f4b9bc23dd -r 9b8d3dae67427121793b2390baca699f2887d600 scripts/galaxy_messaging/client/scan.bat.sample --- a/scripts/galaxy_messaging/client/scan.bat.sample +++ /dev/null @@ -1,1 +0,0 @@ -python amqp_publisher.py -p 2 -c galaxy_amq.ini \ No newline at end of file diff -r b4924e433ae2df18771a7e43c23287f4b9bc23dd -r 9b8d3dae67427121793b2390baca699f2887d600 scripts/galaxy_messaging/client/scan.sh.sample --- a/scripts/galaxy_messaging/client/scan.sh.sample +++ /dev/null @@ -1,1 +0,0 @@ -python amqp_publisher.py -p 3 -c galaxy_amq.ini \ No newline at end of file diff -r b4924e433ae2df18771a7e43c23287f4b9bc23dd -r 9b8d3dae67427121793b2390baca699f2887d600 scripts/galaxy_messaging/client/scanner.py --- a/scripts/galaxy_messaging/client/scanner.py +++ /dev/null @@ -1,92 +0,0 @@ -import sys, os -import serial -import array -import time -import optparse -import ConfigParser, logging -from scanner_interface import ScannerInterface - -logging.basicConfig(level=logging.DEBUG) -log = logging.getLogger( 'Scanner' ) - -# command prefix: SYN M CR -cmd = [22, 77, 13] -response = { 6: 'ACK', 5: 'ENQ', 21: 'NAK' } -image_scanner_report = 'RPTSCN.' -get_prefix1 = 'PREBK2?.' -get_prefix2 = ':4820:PREBK2?.' -set_prefix = 'PREBK2995859.' -clear_prefix = 'PRECA2.' - -def get_prefix_cmd(name): - return ':' + name + ':' + 'PREBK2?.' - -def set_prefix_cmd(name, prefix): - prefix_str = '' - for c in prefix: - prefix_str = prefix_str + hex(ord(c))[2:] - return ':' + name + ':' + 'PREBK299' + prefix_str + '!' - -def read_config_file(config_file): - config = ConfigParser.ConfigParser() - config.read(config_file) - count = 1 - scanners_list = [] - while True: - section = 'scanner%i' % count - if config.has_section(section): - scanner = dict(name=config.get(section, 'name'), - prefix=config.get(section, 'prefix'), - state=config.get(section, 'state')) - scanners_list.append(scanner) - count = count + 1 - else: - return scanners_list - -def main(): - usage = "python %s -p PORT -c CONFIG_FILE [ OPTION ]" % sys.argv[0] - parser = optparse.OptionParser(usage=usage) - parser.add_option('-p', '--port', help='Name of the port where the scanner is connected', - dest='port', action='store') - parser.add_option('-c', '--config-file', help='config file with all the AMQP config parameters', - dest='config_file', action='store') - parser.add_option('-r', '--report', help='scanner report', - dest='report', action='store_true', default=False) - parser.add_option('-i', '--install', help='install the scanners', - dest='install', action='store_true', default=False) - (opts, args) = parser.parse_args() - # validate - if not opts.port: - parser.print_help() - sys.exit(0) - if ( opts.report or opts.install ) and not opts.config_file: - parser.print_help() - sys.exit(0) - - # create the scanner interface - si = ScannerInterface(opts.port) - if opts.install: - scanners_list = read_config_file(opts.config_file) - for scanner in scanners_list: - msg = set_prefix_cmd(scanner['name'], scanner['prefix']) - si.send(msg) - response = si.recv() - if not response: - log.error("Scanner %s could not be installed." % scanner['name']) - elif opts.report: - si.send(image_scanner_report) - rep = si.recv() - log.info(rep) - scanners_list = read_config_file(opts.config_file) - for scanner in scanners_list: - msg = get_prefix_cmd(scanner['name']) - si.send(msg) - response = si.recv() - if response: - log.info('PREFIX for scanner %s: %s' % (scanner['name'], chr(int(response[8:12][:2], 16))+chr(int(response[8:12][2:], 16)) )) - si.close() - - - -if __name__ == "__main__": - main() diff -r b4924e433ae2df18771a7e43c23287f4b9bc23dd -r 9b8d3dae67427121793b2390baca699f2887d600 scripts/galaxy_messaging/client/scanner_interface.py --- a/scripts/galaxy_messaging/client/scanner_interface.py +++ /dev/null @@ -1,76 +0,0 @@ -import sys, os -import serial -import array -import time -import optparse -import ConfigParser -import logging - -logging.basicConfig(level=logging.INFO) -log = logging.getLogger( 'ScannerInterface' ) - -class ScannerInterface( object ): - cmdprefix = [22, 77, 13] - response = { 6: 'ACK', 5: 'ENQ', 21: 'NAK' } - - def __init__( self, port ): - if os.name in ['posix', 'mac']: - self.port = port - elif os.name == 'nt': - self.port = int(port) - if self.port: - self.open() - - def open(self): - try: - self.serial_conn = serial.Serial(self.port) - except serial.SerialException: - log.exception('Unable to open port: %s' % str(self.port)) - sys.exit(1) - log.debug('Port %s is open: %s' %( str(self.port), self.serial_conn.isOpen() ) ) - - def is_open(self): - return self.serial_conn.isOpen() - - def close(self): - self.serial_conn.close() - log.debug('Port %s is open: %s' %( str(self.port), self.serial_conn.isOpen() ) ) - - def send(self, msg): - message = self.cmdprefix + map(ord, msg) - byte_array = array.array('B', message) - log.debug('Sending message to %s: %s' % ( str(self.port), message) ) - bytes = self.serial_conn.write( byte_array.tostring() ) - log.debug('%i bytes out of %i bytes sent to the scanner' % ( bytes, len(message) ) ) - - def recv(self): - time.sleep(1) - self.serial_conn.flush() - nbytes = self.serial_conn.inWaiting() - log.debug('%i bytes received' % nbytes) - if nbytes: - msg = self.serial_conn.read(nbytes) - byte_array = map(ord, msg) - log.debug('Message received [%s]: %s' % (self.response.get(byte_array[len(byte_array)-2], byte_array[len(byte_array)-2]), - msg)) - return msg - else: - log.error('Error!') - return None - - def setup_recv(self, callback): - self.recv_callback = callback - - def wait(self): - nbytes = self.serial_conn.inWaiting() - if nbytes: - msg = self.serial_conn.read(nbytes) - byte_array = map(ord, msg) - log.debug('Message received [%s]: %s' % (self.response.get(byte_array[len(byte_array)-2], byte_array[len(byte_array)-2], - msg))) - if self.recv_callback: - self.recv_callback(msg) - return - - - diff -r b4924e433ae2df18771a7e43c23287f4b9bc23dd -r 9b8d3dae67427121793b2390baca699f2887d600 scripts/galaxy_messaging/server/amqp_consumer.py --- a/scripts/galaxy_messaging/server/amqp_consumer.py +++ /dev/null @@ -1,164 +0,0 @@ -''' -Galaxy Messaging with AMQP (RabbitMQ) -Galaxy uses AMQ protocol to receive messages from external sources like -bar code scanners. Galaxy has been tested against RabbitMQ AMQP implementation. -For Galaxy to receive messages from a message queue the RabbitMQ server has -to be set up with a user account and other parameters listed in the [galaxy_amqp] -section in the universe_wsgi.ini config file -Once the RabbitMQ server has been setup and started with the given parameters, -this script can be run to receive messages and update the Galaxy database accordingly -''' - -import ConfigParser -import sys, os -import optparse -import xml.dom.minidom -import subprocess -import urllib2 - -from xml_helper import get_value, get_value_index - -from galaxydb_interface import GalaxyDbInterface - -api_path = [ os.path.join( os.getcwd(), "scripts/api" ) ] -sys.path.extend( api_path ) -import common as api - -assert sys.version_info[:2] >= ( 2, 4 ) -new_path = [ os.path.join( os.getcwd(), "lib" ) ] -new_path.extend( sys.path[1:] ) # remove scripts/ from the path -sys.path = new_path - -from galaxy import eggs -from galaxy.web.api.requests import RequestsAPIController -import pkg_resources -pkg_resources.require( "amqp" ) -import amqp - -import logging -log = logging.getLogger("GalaxyAMQP") -log.setLevel(logging.DEBUG) -fh = logging.FileHandler("galaxy_listener.log") -fh.setLevel(logging.DEBUG) -formatter = logging.Formatter("%(asctime)s - %(name)s - %(message)s") -fh.setFormatter(formatter) -log.addHandler(fh) - -# data transfer script -data_transfer_script = os.path.join( os.getcwd(), - "scripts/galaxy_messaging/server/data_transfer.py" ) -global config -global config_file_name -global http_server_section - - -def start_data_transfer( message ): - # fork a new process to transfer datasets - cmd = '%s "%s" "%s" "%s"' % ( "python", - data_transfer_script, - message.body, - config_file_name ) # Galaxy config file name - pid = subprocess.Popen(cmd, shell=True).pid - log.debug('Started process (%i): %s' % (pid, str(cmd))) - -def update_sample_state( message ): - dom = xml.dom.minidom.parseString(message.body) - barcode = get_value(dom, 'barcode') - state = get_value(dom, 'state') - api_key = get_value(dom, 'api_key') - log.debug('Barcode: ' + barcode) - log.debug('State: ' + state) - log.debug('API Key: ' + api_key) - # validate - if not barcode or not state or not api_key: - log.debug( 'Incomplete sample_state_update message received. Sample barcode, desired state and user API key is required.' ) - return - # update the sample state in Galaxy db - dbconnstr = config.get("app:main", "database_connection") - galaxydb = GalaxyDbInterface( dbconnstr ) - sample_id = galaxydb.get_sample_id( field_name='bar_code', value=barcode ) - if sample_id == -1: - log.debug( 'Invalid barcode.' ) - return - galaxydb.change_state(sample_id, state) - # after updating the sample state, update request status - request_id = galaxydb.get_request_id(sample_id) - update_request( api_key, request_id ) - -def update_request( api_key, request_id ): - encoded_request_id = api.encode_id( config.get( "app:main", "id_secret" ), request_id ) - data = dict( update_type=RequestsAPIController.update_types.REQUEST ) - url = "http://%s:%s/api/requests/%s" % ( config.get(http_server_section, "host"), - config.get(http_server_section, "port"), - encoded_request_id ) - log.debug( 'Updating request %i' % request_id ) - try: - retval = api.update( api_key, url, data, return_formatted=False ) - log.debug( str( retval ) ) - except Exception, e: - log.debug( 'ERROR(update_request (%s)): %s' % ( str((self.api_key, url, data)), str(e) ) ) - -def recv_callback( message ): - # check the meesage type. - msg_type = message.properties['application_headers'].get('msg_type') - log.debug( 'MESSAGE RECVD: ' + str( msg_type ) ) - if msg_type == 'data_transfer': - log.debug( 'DATA TRANSFER' ) - start_data_transfer( message ) - elif msg_type == 'sample_state_update': - log.debug( 'SAMPLE STATE UPDATE' ) - update_sample_state( message ) - -def main(): - parser = optparse.OptionParser() - parser.add_option('-c', '--config-file', help='Galaxy configuration file', - dest='config_file', action='store') - parser.add_option('-s', '--http-server-section', help='Name of the HTTP server section in the Galaxy configuration file', - dest='http_server_section', action='store') - (opts, args) = parser.parse_args() - log.debug( "GALAXY LISTENER PID: " + str(os.getpid()) + " - " + str( opts ) ) - # read the Galaxy config file - global config_file_name - config_file_name = opts.config_file - global config - config = ConfigParser.ConfigParser() - config.read( opts.config_file ) - global http_server_section - http_server_section = opts.http_server_section - amqp_config = {} - for option in config.options("galaxy_amqp"): - amqp_config[option] = config.get("galaxy_amqp", option) - log.debug( str( amqp_config ) ) - # connect - conn = amqp.Connection(host=amqp_config['host']+":"+amqp_config['port'], - userid=amqp_config['userid'], - password=amqp_config['password'], - virtual_host=amqp_config['virtual_host']) - chan = conn.channel() - chan.queue_declare( queue=amqp_config['queue'], - durable=True, - exclusive=True, - auto_delete=False) - chan.exchange_declare( exchange=amqp_config['exchange'], - type="direct", - durable=True, - auto_delete=False,) - chan.queue_bind( queue=amqp_config['queue'], - exchange=amqp_config['exchange'], - routing_key=amqp_config['routing_key']) - - chan.basic_consume( queue=amqp_config['queue'], - no_ack=True, - callback=recv_callback, - consumer_tag="testtag") - log.debug('Connected to rabbitmq server - '+amqp_config['host']+":"+amqp_config['port']) - while True: - chan.wait() - chan.basic_cancel("testtag") - chan.close() - conn.close() - -if __name__ == '__main__': - main() - - diff -r b4924e433ae2df18771a7e43c23287f4b9bc23dd -r 9b8d3dae67427121793b2390baca699f2887d600 scripts/galaxy_messaging/server/data_transfer.py --- a/scripts/galaxy_messaging/server/data_transfer.py +++ /dev/null @@ -1,239 +0,0 @@ -#!/usr/bin/env python -""" - -Data Transfer Script: Sequencer to Galaxy - -This script is called from Galaxy RabbitMQ listener ( amqp_consumer.py ) once -the lab admin starts the data transfer process using the user interface. - -Usage: - -python data_transfer.py <config_file> - - -""" -import ConfigParser -import cookielib -import datetime -import logging -import optparse -import os -import shutil -import sys -import time -import time -import traceback -import urllib -import urllib2 -import xml.dom.minidom - - -from xml_helper import get_value, get_value_index - -log = logging.getLogger( "datatx_" + str( os.getpid() ) ) -log.setLevel( logging.DEBUG ) -fh = logging.FileHandler( "data_transfer.log" ) -fh.setLevel( logging.DEBUG ) -formatter = logging.Formatter( "%(asctime)s - %(name)s - %(message)s" ) -fh.setFormatter( formatter ) -log.addHandler( fh ) - -api_path = [ os.path.join( os.getcwd(), "scripts/api" ) ] -sys.path.extend( api_path ) -import common as api - -assert sys.version_info[:2] >= ( 2, 4 ) -new_path = [ os.path.join( os.getcwd(), "lib" ) ] -new_path.extend( sys.path[1:] ) # remove scripts/ from the path -sys.path = new_path - -from galaxy import eggs -from galaxy.model import SampleDataset -from galaxy.web.api.samples import SamplesAPIController -import pkg_resources -pkg_resources.require( "pexpect" ) -import pexpect - -log.debug(str(dir(api))) - -class DataTransfer( object ): - - def __init__( self, msg, config_file ): - log.info( msg ) - self.dom = xml.dom.minidom.parseString( msg ) - self.galaxy_host = get_value( self.dom, 'galaxy_host' ) - self.api_key = get_value( self.dom, 'api_key' ) - self.sequencer_host = get_value( self.dom, 'data_host' ) - self.sequencer_username = get_value( self.dom, 'data_user' ) - self.sequencer_password = get_value( self.dom, 'data_password' ) - self.request_id = get_value( self.dom, 'request_id' ) - self.sample_id = get_value( self.dom, 'sample_id' ) - self.library_id = get_value( self.dom, 'library_id' ) - self.folder_id = get_value( self.dom, 'folder_id' ) - self.dataset_files = [] - count=0 - while True: - dataset_id = get_value_index( self.dom, 'dataset_id', count ) - file = get_value_index( self.dom, 'file', count ) - name = get_value_index( self.dom, 'name', count ) - if file: - self.dataset_files.append( dict( name=name, - dataset_id=int( dataset_id ), - file=file ) ) - else: - break - count=count+1 - # read config variables - config = ConfigParser.ConfigParser() - retval = config.read( config_file ) - if not retval: - error_msg = 'FATAL ERROR: Unable to open config file %s.' % config_file - log.error( error_msg ) - sys.exit(1) - try: - self.config_id_secret = config.get( "app:main", "id_secret" ) - except ConfigParser.NoOptionError,e: - self.config_id_secret = "USING THE DEFAULT IS NOT SECURE!" - try: - self.import_dir = config.get( "app:main", "library_import_dir" ) - except ConfigParser.NoOptionError,e: - log.error( 'ERROR: library_import_dir config variable is not set in %s. ' % config_file ) - sys.exit( 1 ) - # create the destination directory within the import directory - self.server_dir = os.path.join( self.import_dir, - 'datatx_' + str( os.getpid() ) + '_' + datetime.date.today( ).strftime( "%d%b%Y" ) ) - try: - os.mkdir( self.server_dir ) - except Exception, e: - self.error_and_exit( str( e ) ) - - def start( self ): - ''' - This method executes the file transfer from the sequencer, adds the dataset - to the data library & finally updates the data transfer status in the db - ''' - # datatx - self.transfer_files() - # add the dataset to the given library - self.add_to_library() - # update the data transfer status in the db - self.update_status( SampleDataset.transfer_status.COMPLETE ) - # cleanup - #self.cleanup() - sys.exit( 0 ) - - def cleanup( self ): - ''' - remove the directory created to store the dataset files temporarily - before adding the same to the data library - ''' - try: - time.sleep( 60 ) - shutil.rmtree( self.server_dir ) - except: - self.error_and_exit() - - - def error_and_exit( self, msg='' ): - ''' - This method is called any exception is raised. This prints the traceback - and terminates this script - ''' - log.error( traceback.format_exc() ) - log.error( 'FATAL ERROR.' + msg ) - self.update_status( 'Error', 'All', msg ) - sys.exit( 1 ) - - def transfer_files( self ): - ''' - This method executes a scp process using pexpect library to transfer - the dataset file from the remote sequencer to the Galaxy server - ''' - def print_ticks( d ): - pass - for i, dataset_file in enumerate( self.dataset_files ): - self.update_status( SampleDataset.transfer_status.TRANSFERRING, dataset_file[ 'dataset_id' ] ) - try: - cmd = "scp %s@%s:'%s' '%s/%s'" % ( self.sequencer_username, - self.sequencer_host, - dataset_file[ 'file' ].replace( ' ', '\ ' ), - self.server_dir.replace( ' ', '\ ' ), - dataset_file[ 'name' ].replace( ' ', '\ ' ) ) - log.debug( cmd ) - output = pexpect.run( cmd, - events={ '.ssword:*': self.sequencer_password+'\r\n', - pexpect.TIMEOUT: print_ticks }, - timeout=10 ) - log.debug( output ) - path = os.path.join( self.server_dir, os.path.basename( dataset_file[ 'name' ] ) ) - if not os.path.exists( path ): - msg = 'Could not find the local file after transfer ( %s ).' % path - log.error( msg ) - raise Exception( msg ) - except Exception, e: - msg = traceback.format_exc() - self.update_status( 'Error', dataset_file['dataset_id'], msg) - - - def add_to_library( self ): - ''' - This method adds the dataset file to the target data library & folder - by opening the corresponding url in Galaxy server running. - ''' - self.update_status( SampleDataset.transfer_status.ADD_TO_LIBRARY ) - try: - data = {} - data[ 'folder_id' ] = 'F%s' % api.encode_id( self.config_id_secret, self.folder_id ) - data[ 'file_type' ] = 'auto' - data[ 'server_dir' ] = self.server_dir - data[ 'dbkey' ] = '' - data[ 'upload_option' ] = 'upload_directory' - data[ 'create_type' ] = 'file' - url = "http://%s/api/libraries/%s/contents" % ( self.galaxy_host, - api.encode_id( self.config_id_secret, self.library_id ) ) - log.debug( str( ( self.api_key, url, data ) ) ) - retval = api.submit( self.api_key, url, data, return_formatted=False ) - log.debug( str( retval ) ) - except Exception, e: - self.error_and_exit( str( e ) ) - - def update_status( self, status, dataset_id='All', msg='' ): - ''' - Update the data transfer status for this dataset in the database - ''' - try: - log.debug( 'Setting status "%s" for dataset "%s" of sample "%s"' % ( status, str( dataset_id ), str( self.sample_id) ) ) - sample_dataset_ids = [] - if dataset_id == 'All': - for dataset in self.dataset_files: - sample_dataset_ids.append( api.encode_id( self.config_id_secret, dataset[ 'dataset_id' ] ) ) - else: - sample_dataset_ids.append( api.encode_id( self.config_id_secret, dataset_id ) ) - # update the transfer status - data = {} - data[ 'update_type' ] = SamplesAPIController.update_types.SAMPLE_DATASET[0] - data[ 'sample_dataset_ids' ] = sample_dataset_ids - data[ 'new_status' ] = status - data[ 'error_msg' ] = msg - url = "http://%s/api/samples/%s" % ( self.galaxy_host, - api.encode_id( self.config_id_secret, self.sample_id ) ) - log.debug( str( ( self.api_key, url, data))) - retval = api.update( self.api_key, url, data, return_formatted=False ) - log.debug( str( retval ) ) - except urllib2.URLError, e: - log.debug( 'ERROR( sample_dataset_transfer_status ( %s ) ): %s' % ( url, str( e ) ) ) - log.error( traceback.format_exc() ) - except: - log.error( traceback.format_exc() ) - log.error( 'FATAL ERROR' ) - sys.exit( 1 ) - -if __name__ == '__main__': - log.info( 'STARTING %i %s' % ( os.getpid(), str( sys.argv ) ) ) - # - # Start the daemon - # - dt = DataTransfer( sys.argv[1], sys.argv[2]) - dt.start() - sys.exit( 0 ) - diff -r b4924e433ae2df18771a7e43c23287f4b9bc23dd -r 9b8d3dae67427121793b2390baca699f2887d600 scripts/galaxy_messaging/server/galaxydb_interface.py --- a/scripts/galaxy_messaging/server/galaxydb_interface.py +++ /dev/null @@ -1,167 +0,0 @@ -#/usr/bin/python - -from datetime import datetime -import sys -import optparse -import os -import time -import logging - -assert sys.version_info[:2] >= ( 2, 4 ) -new_path = [ os.path.join( os.getcwd(), "lib" ) ] -new_path.extend( sys.path[1:] ) # remove scripts/ from the path -sys.path = new_path -from galaxy import eggs -from galaxy.model.custom_types import * -import pkg_resources -pkg_resources.require( "psycopg2" ) -import psycopg2 -pkg_resources.require( "SQLAlchemy >= 0.4" ) -from sqlalchemy import * -from sqlalchemy.orm import sessionmaker - -#logging.basicConfig(level=logging.DEBUG) -#log = logging.getLogger( 'GalaxyDbInterface' ) - -class GalaxyDbInterface(object): - - def __init__(self, dbstr): - self.dbstr = dbstr - self.db_engine = create_engine(self.dbstr) - self.db_engine.echo = False - self.metadata = MetaData(self.db_engine) - self.session = sessionmaker(bind=self.db_engine) - self.event_table = Table('sample_event', self.metadata, autoload=True ) - self.sample_table = Table('sample', self.metadata, autoload=True ) - self.sample_dataset_table = Table('sample_dataset', self.metadata, autoload=True ) - self.request_table = Table('request', self.metadata, autoload=True ) - self.request_event_table = Table('request_event', self.metadata, autoload=True ) - self.state_table = Table('sample_state', self.metadata, autoload=True ) - - def get_sample_id(self, field_name='bar_code', value=None): - if not value: - return -1 - sample_id = -1 - if field_name =='name': - stmt = select(columns=[self.sample_table.c.id], - whereclause=self.sample_table.c.name==value) - result = stmt.execute() - sample_id = result.fetchone()[0] - elif field_name == 'bar_code': - stmt = select(columns=[self.sample_table.c.id], - whereclause=self.sample_table.c.bar_code==value) - result = stmt.execute() - x = result.fetchone() - if x: - sample_id = x[0] - #log.debug('Sample ID: %i' % sample_id) - return sample_id - return -1 - - def get_request_id(self, sample_id): - query = select(columns=[self.sample_table.c.request_id], - whereclause=self.sample_table.c.id==sample_id) - request_id = query.execute().fetchall()[0][0] - return request_id - - def current_state(self, sample_id): - ''' - This method returns the current state of the sample for the given sample_id - ''' - stmt = select(columns=[self.event_table.c.sample_state_id], - whereclause=self.event_table.c.sample_id==sample_id, - order_by=self.event_table.c.update_time.desc()) - result = stmt.execute() - all_states = result.fetchall() - current_state_id = all_states[0][0] - return current_state_id - - def all_possible_states(self, sample_id): - subsubquery = select(columns=[self.sample_table.c.request_id], - whereclause=self.sample_table.c.id==sample_id) - self.request_id = subsubquery.execute().fetchall()[0][0] - #log.debug('REQUESTID: %i' % self.request_id) - subquery = select(columns=[self.request_table.c.request_type_id], - whereclause=self.request_table.c.id==self.request_id) - request_type_id = subquery.execute().fetchall()[0][0] - #log.debug('REQUESTTYPEID: %i' % request_type_id) - query = select(columns=[self.state_table.c.id, self.state_table.c.name], - whereclause=self.state_table.c.request_type_id==request_type_id, - order_by=self.state_table.c.id.asc()) - states = query.execute().fetchall() - #log.debug('POSSIBLESTATES: '+ str(states)) - return states - - def change_state(self, sample_id, new_state=None): - ''' - This method changes the state of the sample to the the 'new_state' - ''' - if not new_state: - return - new_state_id = -1 - # find the state_id for this new state in the list of possible states - possible_states = self.all_possible_states(sample_id) - for state_id, state_name in possible_states: - if new_state == state_name: - new_state_id = state_id - if new_state_id == -1: - return - #log.debug('Updating sample_id %i state to %s' % (sample_id, new_state)) - i = self.event_table.insert() - i.execute(update_time=datetime.utcnow(), - create_time=datetime.utcnow(), - sample_id=sample_id, - sample_state_id=int(new_state_id), - comment='Update by barcode scan') - # if all the samples for this request are in the final state - # then change the request state to 'Complete' - result = select(columns=[self.sample_table.c.id], - whereclause=self.sample_table.c.request_id==self.request_id).execute() - sample_id_list = result.fetchall() - request_complete = True - for sid in sample_id_list: - current_state_id = self.current_state(sid[0]) - if current_state_id != possible_states[-1][0]: - request_complete = False - break - if request_complete: - request_state = 'Complete' - #log.debug('Updating request_id %i state to "%s"' % (self.request_id, request_state)) - i = self.request_event_table.insert() - i.execute(update_time=datetime.utcnow(), - create_time=datetime.utcnow(), - request_id=self.request_id, - state=request_state, - comment='All samples of this request have finished processing.') - - def set_sample_dataset_status(self, id, new_status, msg=None): - u = self.sample_dataset_table.update(whereclause=self.sample_dataset_table.c.id==int(id)) - u.execute(status=new_status) - if new_status == 'Error': - u.execute(error_msg=msg) - else: - u.execute(error_msg='') - return - - - -if __name__ == '__main__': - print '''This file should not be run directly. To start the Galaxy AMQP Listener: - %sh run_galaxy_listener.sh''' - dbstr = 'postgres://postgres:postgres@localhost/g2' - - parser = optparse.OptionParser() - parser.add_option('-n', '--name', help='name of the sample field', dest='name', \ - action='store', default='bar_code') - parser.add_option('-v', '--value', help='value of the sample field', dest='value', \ - action='store') - parser.add_option('-s', '--state', help='new state of the sample', dest='state', \ - action='store') - (opts, args) = parser.parse_args() - - gs = GalaxyDbInterface(dbstr) - sample_id = gs.get_sample_id(field_name=opts.name, value=opts.value) - gs.change_state(sample_id, opts.state) - - - diff -r b4924e433ae2df18771a7e43c23287f4b9bc23dd -r 9b8d3dae67427121793b2390baca699f2887d600 scripts/galaxy_messaging/server/setup_rabbitmq.py --- a/scripts/galaxy_messaging/server/setup_rabbitmq.py +++ /dev/null @@ -1,38 +0,0 @@ -# -# Configuration script for RabbitMQ in Galaxy -# -# Requirements: -# - set the rabbitmqctl_path variable in the 'galaxy_amqp' section in Galaxy config file -# -# Usage: -# $ python setup_rabbitmq.py <Galaxy config file> -# - -import os, sys, csv, ConfigParser - -def main( config_file ): - try: - config = ConfigParser.ConfigParser() - config.read( config_file ) - rabbitmqctl_path = config.get( 'galaxy_amqp', 'rabbitmqctl_path' ) - username = config.get( 'galaxy_amqp', 'userid' ) - password = config.get( 'galaxy_amqp', 'password' ) - virtual_host = config.get( 'galaxy_amqp', 'virtual_host' ) - except Exception, e: - print 'Fatal error:', str(e) - sys.exit(1) - - cmd_list = [ - 'add_user %s %s' % ( username, password ), - 'add_vhost %s' % virtual_host, - 'set_permissions -p %s %s ".*" ".*" ".*"' % ( virtual_host, username ) - ] - - for cmd in cmd_list: - retval = os.system( rabbitmqctl_path + ' ' + cmd ) - if retval: - print "Failed command: %s" % cmd - sys.exit(1) - -if __name__ == '__main__': - main( sys.argv[1] ) diff -r b4924e433ae2df18771a7e43c23287f4b9bc23dd -r 9b8d3dae67427121793b2390baca699f2887d600 scripts/galaxy_messaging/server/xml_helper.py --- a/scripts/galaxy_messaging/server/xml_helper.py +++ /dev/null @@ -1,28 +0,0 @@ -#======= XML helper methods ==================================================== - -import xml.dom.minidom - -def get_value( dom, tag_name ): - ''' - This method extracts the tag value from the xml message - ''' - nodelist = dom.getElementsByTagName( tag_name )[ 0 ].childNodes - rc = "" - for node in nodelist: - if node.nodeType == node.TEXT_NODE: - rc = rc + node.data - return rc - -def get_value_index( dom, tag_name, dataset_id ): - ''' - This method extracts the tag value from the xml message - ''' - try: - nodelist = dom.getElementsByTagName( tag_name )[ dataset_id ].childNodes - except: - return None - rc = "" - for node in nodelist: - if node.nodeType == node.TEXT_NODE: - rc = rc + node.data - return rc diff -r b4924e433ae2df18771a7e43c23287f4b9bc23dd -r 9b8d3dae67427121793b2390baca699f2887d600 setup_rabbitmq.sh --- a/setup_rabbitmq.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/sh - -cd `dirname $0` -python scripts/galaxy_messaging/server/setup_rabbitmq.py universe_wsgi.ini \ No newline at end of file Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.