# HG changeset patch -- Bitbucket.org # Project galaxy-dist # URL http://bitbucket.org/galaxy/galaxy-dist/overview # User rc # Date 1288101623 14400 # Node ID 677bcc65f74f5a460277f089d0cdebf170f02831 # Parent 9e27a8ad881dc4451add6d518468a73dbc9028bc sample_tracking: - added web api methods to update requests, samples using restful api - removed the old ad-doc web api - modified the data transfer code to use this api - code cleanup --- a/scripts/galaxy_messaging/server/galaxyweb_interface.py +++ /dev/null @@ -1,122 +0,0 @@ -import ConfigParser -import sys, os -import array -import time -import optparse,array -import shutil, traceback -import urllib,urllib2, cookielib - -assert sys.version_info[:2] >= ( 2, 4 ) -new_path = [ os.path.join( os.getcwd(), "lib" ) ] -new_path.extend( sys.path[1:] ) # remove scripts/ from the path -sys.path = new_path - -from galaxy import eggs -import pkg_resources - -import pkg_resources -pkg_resources.require( "pycrypto" ) - -from Crypto.Cipher import Blowfish -from Crypto.Util.randpool import RandomPool -from Crypto.Util import number - - -class GalaxyWebInterface(object): - def __init__(self, server_host, server_port, datatx_email, datatx_password, config_id_secret): - self.server_host = server_host - self.server_port = server_port - self.datatx_email = datatx_email - self.datatx_password = datatx_password - self.config_id_secret = config_id_secret - # create url - self.base_url = "http://%s:%s" % (self.server_host, self.server_port) - # login - url = "%s/user/login?email=%s&password=%s&login_button=Login" % (self.base_url, self.datatx_email, self.datatx_password) - cj = cookielib.CookieJar() - self.opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj)) - #print url - f = self.opener.open(url) - if f.read().find("ogged in as "+self.datatx_email) == -1: - # if the user doesnt exist, create the user - url = "%s/user/create?email=%s&username=%s&password=%s&confirm=%s&create_user_button=Submit" % ( self.base_url, self.datatx_email, self.datatx_email, self.datatx_password, self.datatx_password ) - f = self.opener.open(url) - if f.read().find("ogged in as "+self.datatx_email) == -1: - raise Exception("The "+self.datatx_email+" user could not login to Galaxy") - - def add_to_library(self, server_dir, library_id, folder_id, dbkey=''): - ''' - This method adds the dataset file to the target data library & folder - by opening the corresponding url in Galaxy server running. - ''' - params = urllib.urlencode(dict( cntrller='library_admin', - tool_id='upload1', - tool_state='None', - library_id=self.encode_id(library_id), - folder_id=self.encode_id(folder_id), - upload_option='upload_directory', - file_type='auto', - server_dir=os.path.basename(server_dir), - dbkey=dbkey, - show_dataset_id='True', - runtool_btn='Upload to library')) - url = self.base_url+"/library_common/upload_library_dataset" - print url - print params - try: - f = self.opener.open(url, params) - if f.read().find("Data Library") == -1: - raise Exception("Dataset could not be uploaded to the data library. URL: %s, PARAMS=%s" % (url, params)) - except: - return 'ERROR', url, params - - def import_to_history(self, ldda_id, library_id, folder_id): - params = urllib.urlencode(dict( cntrller='library_admin', - show_deleted='False', - library_id=self.encode_id(library_id), - folder_id=self.encode_id(folder_id), - ldda_ids=self.encode_id(ldda_id), - do_action='import_to_history', - use_panels='False')) - #url = "http://lion.bx.psu.edu:8080/library_common/act_on_multiple_datasets?library_id=adb5f5c93f827949&show_deleted=False&ldda_ids=adb5f5c93f827949&cntrller=library_admin&do_action=import_to_history&use_panels=False" - url = self.base_url+"/library_common/act_on_multiple_datasets" - f = self.opener.open(url, params) - x = f.read() - if x.find("1 dataset(s) have been imported into your history.") == -1: - raise Exception("Dataset could not be imported into history") - - def run_workflow(self, workflow_id, hid, workflow_step): - input = str(workflow_step)+'|input' - params = urllib.urlencode({'id':self.encode_id(workflow_id), - 'run_workflow': 'Run workflow', - input: hid}) - url = self.base_url+"/workflow/run" - f = self.opener.open(url, params) - - def logout(self): - # finally logout - f = self.opener.open(self.base_url+'/user/logout') - - def encode_id(self, obj_id ): - id_cipher = Blowfish.new( self.config_id_secret ) - # Convert to string - s = str( obj_id ) - # Pad to a multiple of 8 with leading "!" - s = ( "!" * ( 8 - len(s) % 8 ) ) + s - # Encrypt - return id_cipher.encrypt( s ).encode( 'hex' ) - - def update_request_state(self, request_id): - params = urllib.urlencode(dict( cntrller='requests_admin', - request_id=request_id)) - url = self.base_url + "/requests_common/update_request_state" - f = self.opener.open(url, params) - print url - print params - x = f.read() - - - - - - --- /dev/null +++ b/scripts/api/requests_update_state.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python + + +import os, sys, traceback +sys.path.insert( 0, os.path.dirname( __file__ ) ) +from common import update + +try: + data = {} + data[ 'update_type' ] = 'request_state' +except IndexError: + print 'usage: %s key url' % os.path.basename( sys.argv[0] ) + sys.exit( 1 ) + +update( sys.argv[1], sys.argv[2], data, return_formatted=True ) + --- a/lib/galaxy/web/buildapp.py +++ b/lib/galaxy/web/buildapp.py @@ -106,6 +106,7 @@ def app_factory( global_conf, **kwargs ) add_api_controllers( webapp, app ) webapp.api_mapper.resource( 'content', 'contents', path_prefix='/api/libraries/:library_id', parent_resources=dict( member_name='library', collection_name='libraries' ) ) webapp.api_mapper.resource( 'library', 'libraries', path_prefix='/api' ) + webapp.api_mapper.resource( 'request', 'requests', path_prefix='/api' ) webapp.finalize_config() # Wrap the webapp in some useful middleware if kwargs.get( 'middleware', True ): --- /dev/null +++ b/scripts/api/sample_dataset_update_status.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python + + +import os, sys, traceback +sys.path.insert( 0, os.path.dirname( __file__ ) ) +from common import display +from common import submit +from common import update +try: + data = {} + data[ 'update_type' ] = 'sample_dataset_transfer_status' + data[ 'sample_dataset_ids' ] = sys.argv[3].split(',') + data[ 'new_status' ] = sys.argv[4] +except IndexError: + print 'usage: %s key url sample_dataset_ids new_state [error msg]' % os.path.basename( sys.argv[0] ) + sys.exit( 1 ) +try: + data[ 'error_msg' ] = sys.argv[5] +except IndexError: + data[ 'error_msg' ] = '' +print data +update( sys.argv[1], sys.argv[2], data, return_formatted=True ) --- a/scripts/galaxy_messaging/server/amqp_consumer.py +++ b/scripts/galaxy_messaging/server/amqp_consumer.py @@ -14,24 +14,22 @@ import sys, os import optparse import xml.dom.minidom import subprocess +import urllib2 from galaxydb_interface import GalaxyDbInterface -new_path = [ os.path.join( os.getcwd(), "scripts/galaxy_messaging/server" ) ] -new_path.extend( sys.path[1:] ) # remove scripts/ from the path -sys.path = new_path -from galaxyweb_interface import GalaxyWebInterface +api_path = [ os.path.join( os.getcwd(), "scripts/api" ) ] +sys.path.extend( api_path ) +import common as api assert sys.version_info[:2] >= ( 2, 4 ) new_path = [ os.path.join( os.getcwd(), "lib" ) ] new_path.extend( sys.path[1:] ) # remove scripts/ from the path sys.path = new_path - - from galaxy import eggs +from galaxy.web.api.requests import RequestsController import pkg_resources pkg_resources.require( "amqplib" ) - from amqplib import client_0_8 as amqp import logging @@ -43,7 +41,12 @@ formatter = logging.Formatter("%(asctime fh.setFormatter(formatter) log.addHandler(fh) +# data transfer script +data_transfer_script = os.path.join( os.getcwd(), + "scripts/galaxy_messaging/server/data_transfer.py" ) + global dbconnstr +global webconfig global config def get_value(dom, tag_name): @@ -71,84 +74,98 @@ def get_value_index(dom, tag_name, index rc = rc + node.data return rc -def recv_callback(msg): - global config - global webconfig +def start_data_transfer(message): + # fork a new process to transfer datasets + cmd = '%s "%s" "%s" "%s"' % ( "python", + data_transfer_script, + message.body, + sys.argv[1] ) # Galaxy config file name + pid = subprocess.Popen(cmd, shell=True).pid + log.debug('Started process (%i): %s' % (pid, str(cmd))) + +def update_sample_state(message): + dom = xml.dom.minidom.parseString(message.body) + barcode = get_value(dom, 'barcode') + state = get_value(dom, 'state') + log.debug('Barcode: ' + barcode) + log.debug('State: ' + state) + # update the sample state in Galaxy db + galaxydb = GalaxyDbInterface(dbconnstr) + sample_id = galaxydb.get_sample_id(field_name='bar_code', value=barcode) + if sample_id == -1: + log.debug('Invalid barcode.') + return + galaxydb.change_state(sample_id, state) + # after updating the sample state, update request status + request_id = galaxydb.get_request_id(sample_id) + update_request( request_id ) + +def update_request( request_id ): + http_server_section = webconfig.get( "universe_wsgi_config", "http_server_section" ) + encoded_request_id = api.encode_id( config.get( "app:main", "id_secret" ), request_id ) + api_key = webconfig.get( "data_transfer_user_login_info", "api_key" ) + data = dict( update_type=RequestsController.update_types.REQUEST ) + url = "http://%s:%s/api/requests/%s" % ( config.get(http_server_section, "host"), + config.get(http_server_section, "port"), + encoded_request_id ) + log.debug( 'Updating request %i' % request_id ) + try: + api.update( api_key, url, data, return_formatted=False ) + except urllib2.URLError, e: + log.debug( 'ERROR(update_request (%s)): %s' % ( str((self.api_key, url, data)), str(e) ) ) + +def recv_callback(message): # check the meesage type. - msg_type = msg.properties['application_headers'].get('msg_type') - log.debug('\nMESSAGE RECVD: '+str(msg_type)) + msg_type = message.properties['application_headers'].get('msg_type') + log.debug( 'MESSAGE RECVD: ' + str( msg_type ) ) if msg_type == 'data_transfer': - log.debug('DATA TRANSFER') - # fork a new process to transfer datasets - transfer_script = os.path.join(os.getcwd(), - "scripts/galaxy_messaging/server/data_transfer.py") - cmd = '%s "%s" "%s" "%s"' % ("python", - transfer_script, - msg.body, - config.get("app:main", "id_secret") ) - pid = subprocess.Popen(cmd, shell=True).pid - log.debug('Started process (%i): %s' % (pid, str(cmd))) + log.debug( 'DATA TRANSFER' ) + start_data_transfer( message ) elif msg_type == 'sample_state_update': - log.debug('SAMPLE STATE UPDATE') - dom = xml.dom.minidom.parseString(msg.body) - barcode = get_value(dom, 'barcode') - state = get_value(dom, 'state') - log.debug('Barcode: '+barcode) - log.debug('State: '+state) - # update the galaxy db - galaxydb = GalaxyDbInterface(dbconnstr) - sample_id = galaxydb.get_sample_id(field_name='bar_code', value=barcode) - if sample_id == -1: - log.debug('Invalid barcode.') - return - galaxydb.change_state(sample_id, state) - # update the request state - galaxyweb = GalaxyWebInterface(webconfig.get("universe_wsgi_config", "host"), - webconfig.get("universe_wsgi_config", "port"), - webconfig.get("data_transfer_user_login_info", "email"), - webconfig.get("data_transfer_user_login_info", "password"), - config.get("app:main", "id_secret")) - galaxyweb.update_request_state(galaxydb.get_request_id(sample_id)) - galaxyweb.logout() + log.debug( 'SAMPLE STATE UPDATE' ) + update_sample_state( message ) def main(): if len(sys.argv) < 2: - print 'Usage: python amqp_consumer.py <Galaxy config file>' + print 'Usage: python amqp_consumer.py <Galaxy configuration file>' return global config config = ConfigParser.ConfigParser() - config.read(sys.argv[1]) + config.read( sys.argv[1] ) global dbconnstr dbconnstr = config.get("app:main", "database_connection") amqp_config = {} for option in config.options("galaxy_amqp"): amqp_config[option] = config.get("galaxy_amqp", option) - log.debug(str(amqp_config)) + log.debug("PID: " + str(os.getpid()) + ", " + str(amqp_config)) # web server config global webconfig webconfig = ConfigParser.ConfigParser() webconfig.read('transfer_datasets.ini') - - - - - + # connect conn = amqp.Connection(host=amqp_config['host']+":"+amqp_config['port'], userid=amqp_config['userid'], password=amqp_config['password'], virtual_host=amqp_config['virtual_host'], insist=False) chan = conn.channel() - chan.queue_declare(queue=amqp_config['queue'], durable=True, exclusive=True, auto_delete=False) - chan.exchange_declare(exchange=amqp_config['exchange'], type="direct", durable=True, auto_delete=False,) - chan.queue_bind(queue=amqp_config['queue'], - exchange=amqp_config['exchange'], - routing_key=amqp_config['routing_key']) + chan.queue_declare( queue=amqp_config['queue'], + durable=True, + exclusive=True, + auto_delete=False) + chan.exchange_declare( exchange=amqp_config['exchange'], + type="direct", + durable=True, + auto_delete=False,) + chan.queue_bind( queue=amqp_config['queue'], + exchange=amqp_config['exchange'], + routing_key=amqp_config['routing_key']) - chan.basic_consume(queue=amqp_config['queue'], - no_ack=True, - callback=recv_callback, - consumer_tag="testtag") + chan.basic_consume( queue=amqp_config['queue'], + no_ack=True, + callback=recv_callback, + consumer_tag="testtag") + log.debug('Connected to rabbitmq server - '+amqp_config['host']+":"+amqp_config['port']) while True: chan.wait() chan.basic_cancel("testtag") @@ -157,3 +174,5 @@ def main(): if __name__ == '__main__': main() + + --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -1584,6 +1584,7 @@ class Request( object ): SUBMITTED = 'In Progress', REJECTED = 'Rejected', COMPLETE = 'Complete' ) + api_collection_visible_keys = ( 'id', 'name', 'state' ) def __init__( self, name=None, desc=None, request_type=None, user=None, form_values=None, notification=None ): self.name = name self.desc = desc @@ -1663,7 +1664,7 @@ class Request( object ): # Send email if trans.app.config.smtp_server is not None and self.notification and self.notification[ 'email' ]: host = trans.request.host.split( ':' )[0] - if host in [ 'localhost', '127.0.0.1' ]: + if host in [ 'localhost', '127.0.0.1', '0.0.0.0' ]: host = socket.getfqdn() body = """ Galaxy Sample Tracking Notification @@ -1704,8 +1705,8 @@ All samples in state: %(sample_state s.sendmail( frm, to, message ) s.quit() comments = "Email notification sent to %s." % ", ".join( to ).strip().strip( ',' ) - except: - comments = "Email notification failed." + except Exception,e: + comments = "Email notification failed. (%s)" % str(e) # update the request history with the email notification event elif not trans.app.config.smtp_server: comments = "Email notification failed as SMTP server not set in config file" @@ -1714,6 +1715,18 @@ All samples in state: %(sample_state trans.sa_session.add( event ) trans.sa_session.flush() return comments + def get_api_value( self, view='collection' ): + rval = {} + try: + visible_keys = self.__getattribute__( 'api_' + view + '_visible_keys' ) + except AttributeError: + raise Exception( 'Unknown API view: %s' % view ) + for key in visible_keys: + try: + rval[key] = self.__getattribute__( key ) + except AttributeError: + rval[key] = None + return rval class RequestEvent( object ): def __init__(self, request=None, request_state=None, comment=''): @@ -1734,7 +1747,7 @@ class RequestType( object ): self.sample_form = sample_form self.datatx_info = datatx_info @property - def state( self ): + def final_sample_state( self ): # The states mapper for this object orders ascending return self.states[-1] @@ -1747,12 +1760,6 @@ class RequestTypePermissions( object ): class Sample( object ): bulk_operations = Bunch( CHANGE_STATE = 'Change state', SELECT_LIBRARY = 'Select data library and folder' ) - transfer_status = Bunch( NOT_STARTED = 'Not started', - IN_QUEUE = 'In queue', - TRANSFERRING = 'Transferring dataset', - ADD_TO_LIBRARY = 'Adding to data library', - COMPLETE = 'Complete', - ERROR = 'Error' ) def __init__(self, name=None, desc=None, request=None, form_values=None, bar_code=None, library=None, folder=None): self.name = name self.desc = desc @@ -1776,21 +1783,21 @@ class Sample( object ): def untransferred_dataset_files( self ): untransferred_datasets = [] for dataset in self.datasets: - if dataset.status == self.transfer_status.NOT_STARTED: + if dataset.status == SampleDataset.transfer_status.NOT_STARTED: untransferred_datasets.append( dataset ) return untransferred_datasets @property def inprogress_dataset_files( self ): inprogress_datasets = [] for dataset in self.datasets: - if dataset.status not in [ self.transfer_status.NOT_STARTED, self.transfer_status.COMPLETE ]: + if dataset.status not in [ SampleDataset.transfer_status.NOT_STARTED, SampleDataset.transfer_status.COMPLETE ]: inprogress_datasets.append( dataset ) return inprogress_datasets @property def transferred_dataset_files( self ): transferred_datasets = [] for dataset in self.datasets: - if dataset.status == self.transfer_status.COMPLETE: + if dataset.status == SampleDataset.transfer_status.COMPLETE: transferred_datasets.append( dataset ) return transferred_datasets def dataset_size( self, filepath ): @@ -1818,6 +1825,12 @@ class SampleEvent( object ): self.comment = comment class SampleDataset( object ): + transfer_status = Bunch( NOT_STARTED = 'Not started', + IN_QUEUE = 'In queue', + TRANSFERRING = 'Transferring dataset', + ADD_TO_LIBRARY = 'Adding to data library', + COMPLETE = 'Complete', + ERROR = 'Error' ) def __init__(self, sample=None, name=None, file_path=None, status=None, error_msg=None, size=None): self.sample = sample --- a/lib/galaxy/web/controllers/requests_common.py +++ b/lib/galaxy/web/controllers/requests_common.py @@ -275,7 +275,7 @@ class RequestsCommon( BaseController, Us request_type = request.type name = util.restore_text( params.get( 'name', '' ) ) desc = util.restore_text( params.get( 'desc', '' ) ) - notification = dict( email=[ user.email ], sample_states=[ request_type.state.id ], body='', subject='' ) + notification = dict( email=[ user.email ], sample_states=[ request_type.final_sample_state.id ], body='', subject='' ) values = [] for index, field in enumerate( request_type.request_form.fields ): field_type = field[ 'type' ] @@ -416,11 +416,7 @@ class RequestsCommon( BaseController, Us elif params.get( 'change_state_button', False ): sample_event_comment = util.restore_text( params.get( 'sample_event_comment', '' ) ) new_state = trans.sa_session.query( trans.model.SampleState ).get( trans.security.decode_id( sample_state_id ) ) - for sample_id in selected_samples: - sample = trans.sa_session.query( trans.model.Sample ).get( trans.security.decode_id( sample_id ) ) - event = trans.model.SampleEvent( sample, new_state, sample_event_comment ) - trans.sa_session.add( event ) - trans.sa_session.flush() + self.update_sample_state(trans, cntrller, selected_samples, new_state, comment=sample_event_comment ) return trans.response.send_redirect( web.url_for( controller='requests_common', cntrller=cntrller, action='update_request_state', @@ -486,6 +482,22 @@ class RequestsCommon( BaseController, Us status=status, message=message ) @web.expose + def update_sample_state(self, trans, cntrller, sample_ids, new_state, comment=None ): + for sample_id in sample_ids: + try: + sample = trans.sa_session.query( trans.model.Sample ).get( trans.security.decode_id( sample_id ) ) + except: + if cntrller == 'api': + trans.response.status = 400 + return "Malformed sample id ( %s ) specified, unable to decode." % str( sample_id ) + else: + return invalid_id_redirect( trans, cntrller, sample_id ) + event = trans.model.SampleEvent( sample, new_state, comment ) + trans.sa_session.add( event ) + trans.sa_session.flush() + if cntrller == 'api': + return 200, 'Done' + @web.expose @web.require_login( "delete sequencing requests" ) def delete_request( self, trans, cntrller, **kwd ): params = util.Params( kwd ) @@ -640,6 +652,8 @@ class RequestsCommon( BaseController, Us event = trans.model.RequestEvent( request, request.states.SUBMITTED, message ) trans.sa_session.add( event ) trans.sa_session.flush() + if cntrller == 'api': + return 200, message return trans.response.send_redirect( web.url_for( controller='requests_common', action='manage_request', cntrller=cntrller, @@ -647,7 +661,7 @@ class RequestsCommon( BaseController, Us status=status, message=message ) ) final_state = False - request_type_state = request.type.state + request_type_state = request.type.final_sample_state if common_state.id == request_type_state.id: # since all the samples are in the final state, change the request state to 'Complete' comments = "All samples of this request are in the last sample state (%s). " % request_type_state.name @@ -656,7 +670,7 @@ class RequestsCommon( BaseController, Us else: comments = "All samples are in %s state. " % common_state.name state = request.states.SUBMITTED - event = trans.model.RequestEvent(request, state, comments) + event = trans.model.RequestEvent( request, state, comments ) trans.sa_session.add( event ) trans.sa_session.flush() # check if an email notification is configured to be sent when the samples @@ -666,6 +680,8 @@ class RequestsCommon( BaseController, Us message = comments + retval else: message = comments + if cntrller == 'api': + return 200, message return trans.response.send_redirect( web.url_for( controller='requests_common', action='manage_request', cntrller=cntrller, --- a/scripts/galaxy_messaging/server/data_transfer.py +++ b/scripts/galaxy_messaging/server/data_transfer.py @@ -19,25 +19,26 @@ import urllib,urllib2, cookielib, shutil import logging, time, datetime import xml.dom.minidom -sp = sys.path[0] +log = logging.getLogger("datatx_"+str(os.getpid())) +log.setLevel(logging.DEBUG) +fh = logging.FileHandler("data_transfer.log") +fh.setLevel(logging.DEBUG) +formatter = logging.Formatter("%(asctime)s - %(name)s - %(message)s") +fh.setFormatter(formatter) +log.addHandler(fh) -from galaxydb_interface import GalaxyDbInterface - -assert sys.version_info[:2] >= ( 2, 4 ) -new_path = [ sp ] -new_path.extend( sys.path ) -sys.path = new_path - -from galaxyweb_interface import GalaxyWebInterface +api_path = [ os.path.join( os.getcwd(), "scripts/api" ) ] +sys.path.extend( api_path ) +import common as api assert sys.version_info[:2] >= ( 2, 4 ) new_path = [ os.path.join( os.getcwd(), "lib" ) ] new_path.extend( sys.path[1:] ) # remove scripts/ from the path sys.path = new_path - from galaxy.util.json import from_json_string, to_json_string -from galaxy.model import Sample +from galaxy.model import SampleDataset +from galaxy.web.api.requests import RequestsController from galaxy import eggs import pkg_resources pkg_resources.require( "pexpect" ) @@ -46,28 +47,19 @@ import pexpect pkg_resources.require( "simplejson" ) import simplejson -log = logging.getLogger("datatx_"+str(os.getpid())) -log.setLevel(logging.DEBUG) -fh = logging.FileHandler("data_transfer.log") -fh.setLevel(logging.DEBUG) -formatter = logging.Formatter("%(asctime)s - %(name)s - %(message)s") -fh.setFormatter(formatter) -log.addHandler(fh) - - class DataTransfer(object): - def __init__(self, msg, config_id_secret): + def __init__(self, msg, config_file): log.info(msg) self.dom = xml.dom.minidom.parseString(msg) - self.host = self.get_value(self.dom, 'data_host') - self.username = self.get_value(self.dom, 'data_user') - self.password = self.get_value(self.dom, 'data_password') + self.sequencer_host = self.get_value(self.dom, 'data_host') + self.sequencer_username = self.get_value(self.dom, 'data_user') + self.sequencer_password = self.get_value(self.dom, 'data_password') + self.request_id = self.get_value(self.dom, 'request_id') self.sample_id = self.get_value(self.dom, 'sample_id') self.library_id = self.get_value(self.dom, 'library_id') self.folder_id = self.get_value(self.dom, 'folder_id') self.dataset_files = [] - self.config_id_secret = config_id_secret count=0 while True: dataset_id = self.get_value_index(self.dom, 'dataset_id', count) @@ -82,21 +74,25 @@ class DataTransfer(object): count=count+1 try: # Retrieve the upload user login information from the config file + transfer_datasets_config = ConfigParser.ConfigParser() + transfer_datasets_config.read('transfer_datasets.ini') + self.data_transfer_user_email = transfer_datasets_config.get("data_transfer_user_login_info", "email") + self.data_transfer_user_password = transfer_datasets_config.get("data_transfer_user_login_info", "password") + self.api_key = transfer_datasets_config.get("data_transfer_user_login_info", "api_key") + self.http_server_section = transfer_datasets_config.get("universe_wsgi_config", "http_server_section") config = ConfigParser.ConfigParser() - config.read('transfer_datasets.ini') - self.datatx_email = config.get("data_transfer_user_login_info", "email") - self.datatx_password = config.get("data_transfer_user_login_info", "password") - self.server_host = config.get("universe_wsgi_config", "host") - self.server_port = config.get("universe_wsgi_config", "port") - self.database_connection = config.get("universe_wsgi_config", "database_connection") - self.import_dir = config.get("universe_wsgi_config", "library_import_dir") + config.read( config_file ) + self.server_host = config.get(self.http_server_section, "host") + self.server_port = config.get(self.http_server_section, "port") + self.database_connection = config.get("app:main", "database_connection") + self.import_dir = config.get("app:main", "library_import_dir") + self.config_id_secret = config.get("app:main", "id_secret") + # create the destination directory within the import directory self.server_dir = os.path.join( self.import_dir, 'datatx_'+str(os.getpid())+'_'+datetime.date.today().strftime("%d%b%Y") ) os.mkdir(self.server_dir) if not os.path.exists(self.server_dir): raise Exception - # connect to db - self.galaxydb = GalaxyDbInterface(self.database_connection) except: log.error(traceback.format_exc()) log.error('FATAL ERROR') @@ -114,7 +110,7 @@ class DataTransfer(object): # add the dataset to the given library self.add_to_library() # update the data transfer status in the db - self.update_status(Sample.transfer_status.COMPLETE) + self.update_status(SampleDataset.transfer_status.COMPLETE) # cleanup #self.cleanup() sys.exit(0) @@ -149,15 +145,15 @@ class DataTransfer(object): def print_ticks(d): pass for i, df in enumerate(self.dataset_files): - self.update_status(Sample.transfer_status.TRANSFERRING, df['dataset_id']) + self.update_status(SampleDataset.transfer_status.TRANSFERRING, df['dataset_id']) try: - cmd = "scp %s@%s:'%s' '%s/%s'" % ( self.username, - self.host, - df['file'].replace(' ', '\ '), - self.server_dir.replace(' ', '\ '), - df['name'].replace(' ', '\ ')) + cmd = "scp %s@%s:'%s' '%s/%s'" % ( self.sequencer_username, + self.sequencer_host, + df['file'].replace(' ', '\ '), + self.server_dir.replace(' ', '\ '), + df['name'].replace(' ', '\ ') ) log.debug(cmd) - output = pexpect.run(cmd, events={'.ssword:*': self.password+'\r\n', + output = pexpect.run(cmd, events={'.ssword:*': self.sequencer_password+'\r\n', pexpect.TIMEOUT:print_ticks}, timeout=10) log.debug(output) @@ -176,17 +172,21 @@ class DataTransfer(object): This method adds the dataset file to the target data library & folder by opening the corresponding url in Galaxy server running. ''' + self.update_status(SampleDataset.transfer_status.ADD_TO_LIBRARY) try: - self.update_status(Sample.transfer_status.ADD_TO_LIBRARY) - log.debug("dir:%s, lib:%s, folder:%s" % (self.server_dir, str(self.library_id), str(self.folder_id))) - galaxyweb = GalaxyWebInterface(self.server_host, self.server_port, - self.datatx_email, self.datatx_password, - self.config_id_secret) - retval = galaxyweb.add_to_library(self.server_dir, self.library_id, self.folder_id) - log.debug(str(retval)) - galaxyweb.logout() - except Exception, e: - log.debug(e) + data = {} + data[ 'folder_id' ] = api.encode_id( self.config_id_secret, '%s.%s' % ( 'folder', self.folder_id ) ) + data[ 'file_type' ] = 'auto' + data[ 'server_dir' ] = self.server_dir + data[ 'dbkey' ] = '' + data[ 'upload_option' ] = 'upload_directory' + data[ 'create_type' ] = 'file' + url = "http://%s:%s/api/libraries/%s/contents" % ( self.server_host, + self.server_port, + api.encode_id( self.config_id_secret, self.library_id ) ) + log.debug(str((self.api_key, url, data))) + retval = api.submit( self.api_key, url, data, return_formatted=False ) + except: self.error_and_exit(str(e)) def update_status(self, status, dataset_id='All', msg=''): @@ -195,12 +195,27 @@ class DataTransfer(object): ''' try: log.debug('Setting status "%s" for dataset "%s" of sample "%s"' % ( status, str(dataset_id), str(self.sample_id) ) ) + sample_dataset_ids = [] if dataset_id == 'All': for dataset in self.dataset_files: - self.galaxydb.set_sample_dataset_status(dataset['dataset_id'], status, msg) + sample_dataset_ids.append( api.encode_id( self.config_id_secret, dataset['dataset_id'] ) ) else: - self.galaxydb.set_sample_dataset_status(dataset_id, status, msg) + sample_dataset_ids.append( api.encode_id( self.config_id_secret, dataset_id ) ) + # update the transfer status + data = {} + data[ 'update_type' ] = RequestsController.update_types.SAMPLE_DATASET + data[ 'sample_dataset_ids' ] = sample_dataset_ids + data[ 'new_status' ] = status + data[ 'error_msg' ] = msg + url = "http://%s:%s/api/requests/%s" % ( self.server_host, + self.server_port, + api.encode_id( self.config_id_secret, self.request_id ) ) + log.debug(str((self.api_key, url, data))) + retval = api.update( self.api_key, url, data, return_formatted=False ) log.debug('done.') + except urllib2.URLError, e: + log.debug( 'ERROR(sample_dataset_transfer_status (%s)): %s' % ( url, str(e) ) ) + log.error(traceback.format_exc()) except: log.error(traceback.format_exc()) log.error('FATAL ERROR') --- a/templates/admin/requests/rename_datasets.mako +++ b/templates/admin/requests/rename_datasets.mako @@ -32,7 +32,7 @@ <tbody> %for id in id_list: <% sample_dataset = trans.sa_session.query( trans.model.SampleDataset ).get( trans.security.decode_id( id ) ) %> - %if sample_dataset.status == trans.app.model.Sample.transfer_status.NOT_STARTED: + %if sample_dataset.status == trans.app.model.SampleDataset.transfer_status.NOT_STARTED: <tr><td><% rename_datasets_for_sample_select_field = build_rename_datasets_for_sample_select_field( trans, sample_dataset ) %> --- a/scripts/api/common.py +++ b/scripts/api/common.py @@ -10,6 +10,11 @@ import pkg_resources pkg_resources.require( "simplejson" ) import simplejson +pkg_resources.require( "pycrypto" ) +from Crypto.Cipher import Blowfish +from Crypto.Util.randpool import RandomPool +from Crypto.Util import number + def make_url( api_key, url, args=None ): # Adds the API Key to the URL if it's not already there. if args is None: @@ -36,7 +41,15 @@ def post( api_key, url, data ): req = urllib2.Request( url, headers = { 'Content-Type': 'application/json' }, data = simplejson.dumps( data ) ) return simplejson.loads( urllib2.urlopen( req ).read() ) -def display( api_key, url ): +def put( api_key, url, data ): + # Do the actual PUT + url = make_url( api_key, url ) + req = urllib2.Request( url, headers = { 'Content-Type': 'application/json' }, data = simplejson.dumps( data )) + req.get_method = lambda: 'PUT' + return simplejson.loads( urllib2.urlopen( req ).read() ) + + +def display( api_key, url, return_formatted=True ): # Sends an API GET request and acts as a generic formatter for the JSON response. try: r = get( api_key, url ) @@ -47,6 +60,8 @@ def display( api_key, url ): if type( r ) == unicode: print 'error: %s' % r return None + if not return_formatted: + return r elif type( r ) == list: # Response is a collection as defined in the REST style. print 'Collection Members' @@ -68,7 +83,7 @@ def display( api_key, url ): else: print 'response is unknown type: %s' % type( r ) -def submit( api_key, url, data ): +def submit( api_key, url, data, return_formatted=True ): # Sends an API POST request and acts as a generic formatter for the JSON response. # 'data' will become the JSON payload read by Galaxy. try: @@ -77,6 +92,8 @@ def submit( api_key, url, data ): print e print e.read( 1024 ) sys.exit( 1 ) + if not return_formatted: + return r print 'Response' print '--------' if type( r ) == list: @@ -96,3 +113,45 @@ def submit( api_key, url, data ): print i else: print r + +def update( api_key, url, data, return_formatted=True ): + # Sends an API PUT request and acts as a generic formatter for the JSON response. + # 'data' will become the JSON payload read by Galaxy. + try: + r = put( api_key, url, data ) + except urllib2.HTTPError, e: + print e + print e.read( 1024 ) + sys.exit( 1 ) + if not return_formatted: + return r + print 'Response' + print '--------' + if type( r ) == list: + # Currently the only implemented responses are lists of dicts, because + # submission creates some number of collection elements. + for i in r: + if type( i ) == dict: + if 'url' in i: + print i.pop( 'url' ) + else: + print '----' + if 'name' in i: + print ' name: %s' % i.pop( 'name' ) + for k, v in i.items(): + print ' %s: %s' % ( k, v ) + else: + print i + else: + print r + +# utility method to encode ID's +def encode_id( config_id_secret, obj_id ): + id_cipher = Blowfish.new( config_id_secret ) + # Convert to string + s = str( obj_id ) + # Pad to a multiple of 8 with leading "!" + s = ( "!" * ( 8 - len(s) % 8 ) ) + s + # Encrypt + return id_cipher.encrypt( s ).encode( 'hex' ) + --- a/lib/galaxy/web/controllers/requests_admin.py +++ b/lib/galaxy/web/controllers/requests_admin.py @@ -129,9 +129,9 @@ class DataTransferGrid( grids.Grid ): visible=False, filterable="standard" ) ) operations = [ - grids.GridOperation( "Start Transfer", allow_multiple=True, condition=( lambda item: item.status in [ model.Sample.transfer_status.NOT_STARTED ] ) ), - grids.GridOperation( "Rename", allow_multiple=True, allow_popup=False, condition=( lambda item: item.status in [ model.Sample.transfer_status.NOT_STARTED ] ) ), - grids.GridOperation( "Delete", allow_multiple=True, condition=( lambda item: item.status in [ model.Sample.transfer_status.NOT_STARTED ] ) ), + grids.GridOperation( "Start Transfer", allow_multiple=True, condition=( lambda item: item.status in [ model.SampleDataset.transfer_status.NOT_STARTED ] ) ), + grids.GridOperation( "Rename", allow_multiple=True, allow_popup=False, condition=( lambda item: item.status in [ model.SampleDataset.transfer_status.NOT_STARTED ] ) ), + grids.GridOperation( "Delete", allow_multiple=True, condition=( lambda item: item.status in [ model.SampleDataset.transfer_status.NOT_STARTED ] ) ), ] def apply_query_filter( self, trans, query, **kwd ): sample_id = kwd.get( 'sample_id', None ) @@ -506,7 +506,7 @@ class RequestsAdmin( BaseController, Use name = self.__dataset_name( sample, filepath.split( '/' )[-1] ) sample_dataset = trans.model.SampleDataset( sample=sample, file_path=filepath, - status=sample.transfer_status.NOT_STARTED, + status=trans.app.model.SampleDataset.transfer_status.NOT_STARTED, name=name, error_msg='', size=sample.dataset_size( filepath ) ) @@ -575,6 +575,7 @@ class RequestsAdmin( BaseController, Use <data_host>%(DATA_HOST)s</data_host><data_user>%(DATA_USER)s</data_user><data_password>%(DATA_PASSWORD)s</data_password> + <request_id>%(REQUEST_ID)s</request_id><sample_id>%(SAMPLE_ID)s</sample_id><library_id>%(LIBRARY_ID)s</library_id><folder_id>%(FOLDER_ID)s</folder_id> @@ -588,16 +589,17 @@ class RequestsAdmin( BaseController, Use </dataset>''' datasets = '' for sample_dataset in selected_sample_datasets: - if sample_dataset.status == sample.transfer_status.NOT_STARTED: + if sample_dataset.status == trans.app.model.SampleDataset.transfer_status.NOT_STARTED: datasets = datasets + dataset_xml % dict( ID=str( sample_dataset.id ), NAME=sample_dataset.name, FILE=sample_dataset.file_path ) - sample_dataset.status = sample.transfer_status.IN_QUEUE + sample_dataset.status = trans.app.model.SampleDataset.transfer_status.IN_QUEUE trans.sa_session.add( sample_dataset ) trans.sa_session.flush() data = xml % dict( DATA_HOST=datatx_info['host'], DATA_USER=datatx_info['username'], DATA_PASSWORD=datatx_info['password'], + REQUEST_ID=str(sample.request.id), SAMPLE_ID=str(sample.id), LIBRARY_ID=str(sample.library.id), FOLDER_ID=str(sample.folder.id), @@ -644,6 +646,25 @@ class RequestsAdmin( BaseController, Use sample_id=trans.security.encode_id( sample.id ), status=status, message=message) ) + @web.expose + def update_sample_dataset_status(self, trans, cntrller, sample_dataset_ids, new_status, error_msg=None ): + # check if the new status is a valid transfer status + possible_status_list = [v[1] for v in trans.app.model.SampleDataset.transfer_status.items()] + if new_status not in possible_status_list: + trans.response.status = 400 + return "The requested transfer status ( %s ) is not a valid transfer status." % new_status + for id in util.listify( sample_dataset_ids ): + try: + sd_id = trans.security.decode_id( id ) + sample_dataset = trans.sa_session.query( trans.app.model.SampleDataset ).get( sd_id ) + except: + trans.response.status = 400 + return "Invalid sample dataset id ( %s ) specified." % str( id ) + sample_dataset.status = new_status + sample_dataset.error_msg = error_msg + trans.sa_session.add( sample_dataset ) + trans.sa_session.flush() + return 200, 'Done' # Request Type Stuff @web.expose @web.require_admin --- a/scripts/galaxy_messaging/server/galaxydb_interface.py +++ b/scripts/galaxy_messaging/server/galaxydb_interface.py @@ -28,7 +28,7 @@ class GalaxyDbInterface(object): def __init__(self, dbstr): self.dbstr = dbstr self.db_engine = create_engine(self.dbstr) - self.db_engine.echo = True + self.db_engine.echo = False self.metadata = MetaData(self.db_engine) self.session = sessionmaker(bind=self.db_engine) self.event_table = Table('sample_event', self.metadata, autoload=True ) --- /dev/null +++ b/scripts/api/sample_update_state.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python + + +import os, sys, traceback +sys.path.insert( 0, os.path.dirname( __file__ ) ) +from common import display +from common import submit +from common import update + +try: + data = {} + data[ 'update_type' ] = 'sample_state' + data[ 'sample_ids' ] = sys.argv[3].split(',') + data[ 'new_state' ] = sys.argv[4] +except IndexError: + print 'usage: %s key url sample_ids new_state [comment]' % os.path.basename( sys.argv[0] ) + sys.exit( 1 ) +try: + data[ 'comment' ] = sys.argv[5] +except IndexError: + data[ 'comment' ] = '' + +update( sys.argv[1], sys.argv[2], data, return_formatted=True ) --- /dev/null +++ b/lib/galaxy/web/api/requests.py @@ -0,0 +1,130 @@ +""" +API operations on a sample tracking system. +""" +import logging, os, string, shutil, urllib, re, socket +from cgi import escape, FieldStorage +from galaxy import util, datatypes, jobs, web, util +from galaxy.web.base.controller import * +from galaxy.util.sanitize_html import sanitize_html +from galaxy.model.orm import * +from galaxy.util.bunch import Bunch + +log = logging.getLogger( __name__ ) + +class RequestsController( BaseController ): + update_types = Bunch( REQUEST = 'request_state', + SAMPLE = 'sample_state', + SAMPLE_DATASET = 'sample_dataset_transfer_status' ) + update_type_values = [v[1] for v in update_types.items()] + @web.expose_api + def index( self, trans, **kwd ): + """ + GET /api/requests + Displays a collection (list) of sequencing requests. + """ + query = trans.sa_session.query( trans.app.model.Request )\ + .filter( and_( trans.app.model.Request.table.c.user_id == trans.user.id \ + and trans.app.model.Request.table.c.deleted == False ) ) \ + .all() + rval = [] + for request in query: + item = request.get_api_value() + item['url'] = url_for( 'requests', id=trans.security.encode_id( request.id ) ) + item['id'] = trans.security.encode_id( item['id'] ) + rval.append( item ) + return rval + + @web.expose_api + def update( self, trans, id, key, payload, **kwd ): + """ + PUT /api/requests/{encoded_request_id} + Displays information about a sequencing request. + """ + params = util.Params( kwd ) + update_type = None + if 'update_type' not in payload: + trans.response.status = 400 + return "Missing required 'update_type' parameter. Please consult the API documentation for help." + else: + update_type = payload.pop( 'update_type' ) + if update_type not in self.update_type_values: + trans.response.status = 400 + return "Invalid value for 'update_type' parameter ( %s ) specified. Please consult the API documentation for help." % update_type + try: + request_id = trans.security.decode_id( id ) + except TypeError: + trans.response.status = 400 + return "Malformed %s id ( %s ) specified, unable to decode." % ( update_type, str( id ) ) + try: + request = trans.sa_session.query( trans.app.model.Request ).get( request_id ) + except: + request = None + if not request or not ( trans.user_is_admin() or request.user.id == trans.user.id ): + trans.response.status = 400 + return "Invalid request id ( %s ) specified." % str( request_id ) + # check update type + if update_type == 'request_state': + return self.__update_request_state( trans, encoded_request_id=id ) + elif update_type == 'sample_state': + return self.__update_sample_state( trans, request.type, **payload ) + elif update_type == 'sample_dataset_transfer_status': + # update sample_dataset transfer status + return self.__update_sample_dataset_status( trans, **payload ) + + + def __update_request_state( self, trans, encoded_request_id ): + requests_common_cntrller = trans.webapp.controllers['requests_common'] + status, output = requests_common_cntrller.update_request_state( trans, + cntrller='api', + request_id=encoded_request_id ) + return status, output + def __update_sample_state( self, trans, request_type, **payload ): + # only admin user may update sample state in Galaxy sample tracking + if not trans.user_is_admin(): + trans.response.status = 400 + return "only an admin user may update sample state in Galaxy sample tracking." + if 'sample_ids' not in payload or 'new_state' not in payload: + trans.response.status = 400 + return "Missing one or more required parameters: 'sample_ids' and 'new_state'." + sample_ids = payload.pop( 'sample_ids' ) + new_state_name = payload.pop( 'new_state' ) + comment = payload.get( 'comment', '' ) + # check if the new state is a valid sample state + possible_states = request_type.states + new_state = None + for state in possible_states: + if state.name == new_state_name: + new_state = state + if not new_state: + trans.response.status = 400 + return "Invalid sample state requested ( %s )." % new_state + requests_common_cntrller = trans.webapp.controllers['requests_common'] + status, output = requests_common_cntrller.update_sample_state( trans, + cntrller='api', + sample_ids=sample_ids, + new_state=new_state, + comment=comment ) + return status, output + + def __update_sample_dataset_status( self, trans, **payload ): + # only admin user may transfer sample datasets in Galaxy sample tracking + if not trans.user_is_admin(): + trans.response.status = 400 + return "Only an admin user may transfer sample datasets in Galaxy sample tracking and thus update transfer status." + if 'sample_dataset_ids' not in payload or 'new_status' not in payload: + trans.response.status = 400 + return "Missing one or more required parameters: 'sample_dataset_ids' and 'new_status'." + sample_dataset_ids = payload.pop( 'sample_dataset_ids' ) + new_status = payload.pop( 'new_status' ) + error_msg = payload.get( 'error_msg', '' ) + requests_admin_cntrller = trans.webapp.controllers['requests_admin'] + status, output = requests_admin_cntrller.update_sample_dataset_status( trans, + cntrller='api', + sample_dataset_ids=sample_dataset_ids, + new_status=new_status, + error_msg=error_msg ) + return status, output + + + + --- a/templates/admin/requests/dataset.mako +++ b/templates/admin/requests/dataset.mako @@ -58,7 +58,7 @@ <label>Transfer status:</label><div style="float: left; width: 250px; margin-right: 10px;"> ${sample_dataset.status} - %if sample_dataset.status == sample.transfer_status.ERROR: + %if sample_dataset.status == trans.app.model.SampleDataset.transfer_status.ERROR: <br/> ${sample_dataset.error_msg} %endif --- a/scripts/api/display.py +++ b/scripts/api/display.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -import os, sys +import os, sys, urllib2 sys.path.insert( 0, os.path.dirname( __file__ ) ) from common import display @@ -9,3 +9,6 @@ try: except TypeError: print 'usage: %s key url' % os.path.basename( sys.argv[0] ) sys.exit( 1 ) +except urllib2.URLError, e: + print str(e) + sys.exit( 1 )